From 34cc0072d735f41f13aeb90c5b935b87c7d4c558 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Mon, 13 Apr 2026 17:01:46 -0400 Subject: [PATCH 01/20] Create datadog-metrics-collector crate to collect instance value with tags --- Cargo.lock | 39 +++++++ crates/datadog-metrics-collector/Cargo.toml | 14 +++ .../datadog-metrics-collector/src/instance.rs | 109 ++++++++++++++++++ crates/datadog-metrics-collector/src/lib.rs | 11 ++ crates/datadog-metrics-collector/src/tags.rs | 55 +++++++++ 5 files changed, 228 insertions(+) create mode 100644 crates/datadog-metrics-collector/Cargo.toml create mode 100644 crates/datadog-metrics-collector/src/instance.rs create mode 100644 crates/datadog-metrics-collector/src/lib.rs create mode 100644 crates/datadog-metrics-collector/src/tags.rs diff --git a/Cargo.lock b/Cargo.lock index 962438d..6c13750 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -487,6 +487,15 @@ dependencies = [ "zstd", ] +[[package]] +name = "datadog-metrics-collector" +version = "0.1.0" +dependencies = [ + "dogstatsd", + "libdd-common 1.1.0", + "tracing", +] + [[package]] name = "datadog-opentelemetry" version = "0.3.0" @@ -540,6 +549,7 @@ version = "0.1.0" dependencies = [ "datadog-fips", "datadog-logs-agent", + "datadog-metrics-collector", "datadog-trace-agent", "dogstatsd", "libdd-trace-utils 3.0.1", @@ -1454,6 +1464,35 @@ dependencies = [ "libdd-common 4.0.0", ] +[[package]] +name = "libdd-common" +version = "1.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=d52ee90209cb12a28bdda0114535c1a985a29d95#d52ee90209cb12a28bdda0114535c1a985a29d95" +dependencies = [ + "anyhow", + "cc", + "const_format", + "futures", + "futures-core", + "futures-util", + "hex", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "libc", + "nix", + "pin-project", + "regex", + "serde", + "static_assertions", + "thiserror 1.0.69", + "tokio", + "tower-service", + "windows-sys 0.52.0", +] + [[package]] name = "libdd-common" version = "2.0.1" diff --git a/crates/datadog-metrics-collector/Cargo.toml b/crates/datadog-metrics-collector/Cargo.toml new file mode 100644 index 0000000..0153e63 --- /dev/null +++ b/crates/datadog-metrics-collector/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "datadog-metrics-collector" +version = "0.1.0" +edition.workspace = true +license.workspace = true +description = "Collector to read, compute, and submit enhanced metrics in Serverless environments" + +[dependencies] +dogstatsd = { path = "../dogstatsd", default-features = true } +tracing = { version = "0.1", default-features = false } +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95", default-features = false } + +[features] +windows-enhanced-metrics = [] diff --git a/crates/datadog-metrics-collector/src/instance.rs b/crates/datadog-metrics-collector/src/instance.rs new file mode 100644 index 0000000..848e862 --- /dev/null +++ b/crates/datadog-metrics-collector/src/instance.rs @@ -0,0 +1,109 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Instance identity metric collector for Azure Functions. +//! +//! Submits `azure.functions.enhanced.instance` with value 1.0 on each +//! collection tick, tagged with the instance identifier. The env var +//! checked depends on the Azure plan type: +//! +//! - Elastic Premium / Premium: `WEBSITE_INSTANCE_ID` +//! - Flex Consumption / Consumption: `WEBSITE_POD_NAME` or `CONTAINER_NAME` + +use dogstatsd::aggregator::AggregatorHandle; +use dogstatsd::metric::{Metric, MetricValue, SortedTags}; +use std::env; +use tracing::{debug, error, info}; + +const INSTANCE_METRIC: &str = "azure.functions.enhanced.instance"; + +/// Resolves the instance ID from explicit values (used by tests). +fn resolve_instance_id_from( + website_instance_id: Option<&str>, + website_pod_name: Option<&str>, + container_name: Option<&str>, +) -> Option { + website_instance_id + .or(website_pod_name) + .or(container_name) + .map(String::from) +} + +/// Resolves the instance ID from environment variables. +/// +/// Checks in order: +/// 1. `WEBSITE_INSTANCE_ID` (Elastic Premium / Premium plans) +/// 2. `WEBSITE_POD_NAME` (Flex Consumption plans) +/// 3. `CONTAINER_NAME` (Consumption plans) +fn resolve_instance_id() -> Option { + resolve_instance_id_from( + env::var("WEBSITE_INSTANCE_ID").ok().as_deref(), + env::var("WEBSITE_POD_NAME").ok().as_deref(), + env::var("CONTAINER_NAME").ok().as_deref(), + ) +} + +pub struct InstanceMetricsCollector { + aggregator: AggregatorHandle, + tags: Option, + instance_id: Option, +} + +impl InstanceMetricsCollector { + pub fn new(aggregator: AggregatorHandle, tags: Option) -> Self { + let instance_id = resolve_instance_id(); + if let Some(ref id) = instance_id { + info!("Instance ID resolved: {}", id); + } else { + debug!("No instance ID found, instance metric will not be submitted"); + } + Self { + aggregator, + tags, + instance_id, + } + } + + pub fn collect_and_submit(&self) { + let Some(ref instance_id) = self.instance_id else { + debug!("No instance ID available, skipping instance metric"); + return; + }; + + // Build tags: start with shared tags, add instance + let instance_tag = format!("instance:{}", instance_id); + let tags = match &self.tags { + Some(existing) => { + let mut combined = existing.clone(); + if let Ok(id_tag) = SortedTags::parse(&instance_tag) { + combined.extend(&id_tag); + } + Some(combined) + } + None => SortedTags::parse(&instance_tag).ok(), + }; + + let metric = Metric::new(INSTANCE_METRIC.into(), MetricValue::gauge(1.0), tags, None); + + if let Err(e) = self.aggregator.insert_batch(vec![metric]) { + error!("Failed to insert instance metric: {}", e); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_resolve_instance_id_falls_back_to_pod_name() { + let id = resolve_instance_id_from(None, Some("pod-xyz"), Some("container-123")); + assert_eq!(id, Some("pod-xyz".to_string())); + } + + #[test] + fn test_resolve_instance_id_falls_back_to_container_name() { + let id = resolve_instance_id_from(None, None, Some("container-123")); + assert_eq!(id, Some("container-123".to_string())); + } +} diff --git a/crates/datadog-metrics-collector/src/lib.rs b/crates/datadog-metrics-collector/src/lib.rs new file mode 100644 index 0000000..5f22d37 --- /dev/null +++ b/crates/datadog-metrics-collector/src/lib.rs @@ -0,0 +1,11 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(test), deny(clippy::panic))] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] +#![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::todo))] +#![cfg_attr(not(test), deny(clippy::unimplemented))] + +pub mod instance; +pub mod tags; diff --git a/crates/datadog-metrics-collector/src/tags.rs b/crates/datadog-metrics-collector/src/tags.rs new file mode 100644 index 0000000..c6db691 --- /dev/null +++ b/crates/datadog-metrics-collector/src/tags.rs @@ -0,0 +1,55 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Shared tag builder for enhanced metrics. +//! +//! Tags are attached to all enhanced metrics submitted by the metrics collector. + +use dogstatsd::metric::SortedTags; +use libdd_common::azure_app_services; +use std::env; + +/// Builds the common tags for all enhanced metrics. +/// +/// Sources: +/// - Azure metadata (resource_group, subscription_id, name) from libdd_common +/// - Environment variables (region, plan_tier, service, env, version, serverless_compat_version) +/// +/// The DogStatsD origin tag (e.g. `origin:azurefunction`) is added by the metrics aggregator, +/// not here. +pub fn build_enhanced_metrics_tags() -> Option { + let mut tag_parts = Vec::new(); + + if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION { + let aas_tags = [ + ("resource_group", aas_metadata.get_resource_group()), + ("subscription_id", aas_metadata.get_subscription_id()), + ("name", aas_metadata.get_site_name()), + ]; + for (name, value) in aas_tags { + if value != "unknown" { + tag_parts.push(format!("{}:{}", name, value)); + } + } + } + + for (tag_name, env_var) in [ + ("region", "REGION_NAME"), + ("plan_tier", "WEBSITE_SKU"), + ("service", "DD_SERVICE"), + ("env", "DD_ENV"), + ("version", "DD_VERSION"), + ("serverless_compat_version", "DD_SERVERLESS_COMPAT_VERSION"), + ] { + if let Ok(val) = env::var(env_var) + && !val.is_empty() + { + tag_parts.push(format!("{}:{}", tag_name, val)); + } + } + + if tag_parts.is_empty() { + return None; + } + SortedTags::parse(&tag_parts.join(",")).ok() +} From 26355c5b9b256709cb14e6baa5c39716fc5a9f76 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Mon, 13 Apr 2026 17:22:14 -0400 Subject: [PATCH 02/20] Categorize metrics with azure.functions prefix as enhanced metrics --- crates/dogstatsd/src/origin.rs | 46 +++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index fc025b9..98e18c2 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -18,6 +18,7 @@ const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; const DATADOG_PREFIX: &str = "datadog."; const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; +const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; const JVM_PREFIX: &str = "jvm."; const RUNTIME_PREFIX: &str = "runtime."; @@ -83,15 +84,17 @@ impl Metric { .join("."); // Determine the service based on metric prefix first - let service = if metric_name.starts_with(JVM_PREFIX) - || metric_name.starts_with(RUNTIME_PREFIX) - { - OriginService::ServerlessRuntime - } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX { - OriginService::ServerlessEnhanced - } else { - OriginService::ServerlessCustom - }; + let service = + if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) { + OriginService::ServerlessRuntime + } else if metric_prefix == AWS_LAMBDA_PREFIX + || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX + || metric_prefix == AZURE_FUNCTIONS_PREFIX + { + OriginService::ServerlessEnhanced + } else { + OriginService::ServerlessCustom + }; // Then determine the category based on tags let category = if has_tag_value(&tags, AWS_LAMBDA_TAG_KEY, "") { @@ -351,6 +354,31 @@ mod tests { assert_eq!(origin, None); } + #[test] + fn test_find_metric_origin_azure_functions_enhanced() { + let tags = SortedTags::parse("origin:azurefunction").unwrap(); + let metric = Metric { + id: 0, + name: "azure.functions.enhanced.instance".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::AzureFunctionsMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + #[test] fn test_find_metric_origin_unknown() { let tags = SortedTags::parse("unknown:tag").unwrap(); From c5d8f8f7b9ebf3a84f7e2adee36ef63c47f4ce13 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Mon, 13 Apr 2026 17:23:19 -0400 Subject: [PATCH 03/20] Use metrics collector in main loop and refactor start_dogstatsd --- Cargo.lock | 1 - crates/datadog-serverless-compat/src/main.rs | 207 ++++++++++++------- 2 files changed, 131 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c13750..d4d1881 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -549,7 +549,6 @@ version = "0.1.0" dependencies = [ "datadog-fips", "datadog-logs-agent", - "datadog-metrics-collector", "datadog-trace-agent", "dogstatsd", "libdd-trace-utils 3.0.1", diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index d0af3db..947a79e 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -41,10 +41,12 @@ use dogstatsd::{ util::parse_metric_namespace, }; +use datadog_metrics_collector::instance::InstanceMetricsCollector; use dogstatsd::metric::{EMPTY_TAGS, SortedTags}; use tokio_util::sync::CancellationToken; const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; +const ENHANCED_METRICS_COLLECTION_INTERVAL_SECS: u64 = 10; const DOGSTATSD_TIMEOUT_DURATION: Duration = Duration::from_secs(5); const DEFAULT_DOGSTATSD_PORT: u16 = 8125; const DEFAULT_LOG_INTAKE_PORT: u16 = 10517; @@ -126,6 +128,18 @@ pub async fn main() { .and_then(|v| v.parse::().ok()) .unwrap_or(DEFAULT_LOG_INTAKE_PORT); + // Only enable enhanced metrics for Linux Azure Functions + #[cfg(not(feature = "windows-enhanced-metrics"))] + let dd_enhanced_metrics = env_type == EnvironmentType::AzureFunction + && env::var("DD_ENHANCED_METRICS_ENABLED") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); + + // Enhanced metrics are not yet supported in Windows environments + #[cfg(feature = "windows-enhanced-metrics")] + let dd_enhanced_metrics = false; + + let dd_agent_stats_computation_enabled = env::var("DD_AGENT_STATS_COMPUTATION_ENABLED") .map(|val| val.to_lowercase() == "true") .unwrap_or(false); @@ -213,30 +227,59 @@ pub async fn main() { } }); - let (metrics_flusher, _aggregator_handle) = if dd_use_dogstatsd { - debug!("Starting dogstatsd"); - let (_, metrics_flusher, aggregator_handle) = start_dogstatsd( - dd_dogstatsd_port, + let needs_aggregator = dd_use_dogstatsd || dd_enhanced_metrics; + + // The aggregator is shared between dogstatsd and enhanced metrics. + // It is started independently so that either can be enabled without the other. + let (metrics_flusher, aggregator_handle) = if needs_aggregator { + debug!("Creating metrics flusher and aggregator"); + + let (flusher, handle) = start_aggregator( dd_api_key.clone(), dd_site, https_proxy.clone(), dogstatsd_tags, - dd_statsd_metric_namespace, - #[cfg(all(windows, feature = "windows-pipes"))] - dd_dogstatsd_windows_pipe_name.clone(), ) .await; - if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { - info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + + if dd_use_dogstatsd { + debug!("Starting dogstatsd"); + let _ = start_dogstatsd_listener( + dd_dogstatsd_port, + handle.clone(), + dd_statsd_metric_namespace, + #[cfg(all(windows, feature = "windows-pipes"))] + dd_dogstatsd_windows_pipe_name.clone(), + ) + .await; + if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { + info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + } else { + info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + } } else { - info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + info!("dogstatsd disabled"); } - (metrics_flusher, Some(aggregator_handle)) + (flusher, Some(handle)) } else { - info!("dogstatsd disabled"); + info!("dogstatsd and enhanced metrics disabled"); (None, None) }; + let instance_collector = if dd_enhanced_metrics && metrics_flusher.is_some() { + aggregator_handle.as_ref().map(|handle| { + let tags = datadog_metrics_collector::tags::build_enhanced_metrics_tags(); + InstanceMetricsCollector::new(handle.clone(), tags) + }) + } else { + if !dd_enhanced_metrics { + info!("Enhanced metrics disabled"); + } else { + info!("Enhanced metrics enabled but metrics flusher not found"); + } + None + }; + let (log_flusher, _log_aggregator_handle): (Option, Option) = if dd_logs_enabled { debug!("Starting log agent"); @@ -256,48 +299,54 @@ pub async fn main() { }; let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); + let mut enhanced_metrics_collection_interval = interval(Duration::from_secs( + ENHANCED_METRICS_COLLECTION_INTERVAL_SECS, + )); flush_interval.tick().await; // discard first tick, which is instantaneous + enhanced_metrics_collection_interval.tick().await; // Builders for log batches that failed transiently in the previous flush // cycle. They are redriven on the next cycle before new batches are sent. let mut pending_log_retries: Vec = Vec::new(); loop { - flush_interval.tick().await; - - if let Some(metrics_flusher) = metrics_flusher.as_ref() { - debug!("Flushing dogstatsd metrics"); - metrics_flusher.flush().await; - } + tokio::select! { + _ = flush_interval.tick() => { + if let Some(metrics_flusher) = metrics_flusher.clone() { + debug!("Flushing dogstatsd metrics"); + tokio::spawn(async move { + metrics_flusher.flush().await; + }); + } - if let Some(log_flusher) = log_flusher.as_ref() { - debug!("Flushing log agent"); - let retry_in = std::mem::take(&mut pending_log_retries); - let failed = log_flusher.flush(retry_in).await; - if !failed.is_empty() { - // TODO: surface flush failures into health/metrics telemetry so - // operators have a durable signal beyond log lines when logs are - // being dropped (e.g. increment a statsd counter or set a gauge). - warn!( - "log agent flush failed for {} batch(es); will retry next cycle", - failed.len() - ); - pending_log_retries = failed; + if let Some(log_flusher) = log_flusher.as_ref() { + debug!("Flushing log agent"); + let retry_in = std::mem::take(&mut pending_log_retries); + let failed = log_flusher.flush(retry_in).await; + if !failed.is_empty() { + warn!( + "log agent flush failed for {} batch(es); will retry next cycle", + failed.len() + ); + pending_log_retries = failed; + } + } + } + _ = enhanced_metrics_collection_interval.tick() => { + if let Some(ref collector) = instance_collector { + collector.collect_and_submit(); + } } } } } -async fn start_dogstatsd( - port: u16, +async fn start_aggregator( dd_api_key: Option, dd_site: String, https_proxy: Option, dogstatsd_tags: &str, - metric_namespace: Option, - #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, -) -> (CancellationToken, Option, AggregatorHandle) { - // 1. Create the aggregator service +) -> (Option, AggregatorHandle) { #[allow(clippy::expect_used)] let (service, handle) = AggregatorService::new( SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), @@ -305,53 +354,17 @@ async fn start_dogstatsd( ) .expect("Failed to create aggregator service"); - // 2. Start the aggregator service in the background tokio::spawn(service.run()); - #[cfg(all(windows, feature = "windows-pipes"))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - windows_pipe_name, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - - #[cfg(not(all(windows, feature = "windows-pipes")))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); - - // 3. Use handle in DogStatsD (cheap to clone) - let dogstatsd_client = DogStatsD::new( - &dogstatsd_config, - handle.clone(), - dogstatsd_cancel_token.clone(), - ) - .await; - - tokio::spawn(async move { - dogstatsd_client.spin().await; - }); - let metrics_flusher = match dd_api_key { Some(dd_api_key) => { let client = match build_metrics_client(https_proxy, DOGSTATSD_TIMEOUT_DURATION) { Ok(client) => client, Err(e) => { error!("Failed to build HTTP client: {e}, won't flush metrics"); - return (dogstatsd_cancel_token, None, handle); + return (None, handle); } }; - let metrics_intake_url_prefix = match Site::new(dd_site) .map_err(|e| e.to_string()) .and_then(|site| { @@ -360,7 +373,7 @@ async fn start_dogstatsd( Ok(prefix) => prefix, Err(e) => { error!("Failed to create metrics intake URL: {e}, won't flush metrics"); - return (dogstatsd_cancel_token, None, handle); + return (None, handle); } }; @@ -380,7 +393,49 @@ async fn start_dogstatsd( } }; - (dogstatsd_cancel_token, metrics_flusher, handle) + (metrics_flusher, handle) +} + +async fn start_dogstatsd_listener( + port: u16, + handle: AggregatorHandle, + metric_namespace: Option, + #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, +) -> CancellationToken { + #[cfg(all(windows, feature = "windows-pipes"))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + windows_pipe_name, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + + #[cfg(not(all(windows, feature = "windows-pipes")))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); + + let dogstatsd_client = DogStatsD::new( + &dogstatsd_config, + handle.clone(), + dogstatsd_cancel_token.clone(), + ) + .await; + + tokio::spawn(async move { + dogstatsd_client.spin().await; + }); + + dogstatsd_cancel_token } fn build_metrics_client( From 4b170a0c4bb3abe6ff98960bfa17196967afc054 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Mon, 13 Apr 2026 17:23:55 -0400 Subject: [PATCH 04/20] Add windows-enhanced-metrics feature to CI --- .github/workflows/build-datadog-serverless-compat.yml | 4 ++-- .github/workflows/cargo.yml | 2 +- crates/datadog-serverless-compat/Cargo.toml | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build-datadog-serverless-compat.yml b/.github/workflows/build-datadog-serverless-compat.yml index 8c88500..070938f 100644 --- a/.github/workflows/build-datadog-serverless-compat.yml +++ b/.github/workflows/build-datadog-serverless-compat.yml @@ -56,7 +56,7 @@ jobs: retention-days: 3 - if: ${{ inputs.runner == 'windows-2022' }} shell: bash - run: cargo build --release -p datadog-serverless-compat --features windows-pipes + run: cargo build --release -p datadog-serverless-compat --features windows-pipes,windows-enhanced-metrics - if: ${{ inputs.runner == 'windows-2022' }} uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2 with: @@ -69,7 +69,7 @@ jobs: rustup target add i686-pc-windows-msvc cargo build --release -p datadog-serverless-compat \ --target i686-pc-windows-msvc \ - --features windows-pipes + --features windows-pipes,windows-enhanced-metrics - if: ${{ inputs.runner == 'windows-2022' }} uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2 with: diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index fc640c5..7863afd 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -95,7 +95,7 @@ jobs: - shell: bash run: | if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then - cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes + cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes,datadog-serverless-compat/windows-enhanced-metrics else cargo nextest run --workspace fi diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index 960c186..c33e114 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -8,9 +8,11 @@ description = "Binary to run trace-agent and dogstatsd servers in Serverless env [features] default = [] windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"] +windows-enhanced-metrics = ["datadog-metrics-collector/windows-enhanced-metrics"] [dependencies] datadog-logs-agent = { path = "../datadog-logs-agent" } +datadog-metrics-collector = { path = "../datadog-metrics-collector" } datadog-trace-agent = { path = "../datadog-trace-agent" } libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d7eef8031192d0ee79ba64cd824804c5a57abacf" } datadog-fips = { path = "../datadog-fips", default-features = false } From 6dab19806066d90f209a8fba8525fc134ae150bc Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Tue, 14 Apr 2026 16:24:57 -0400 Subject: [PATCH 05/20] Update collection interval, change info log to debug, and update libdatadog rev --- Cargo.lock | 31 +------------------ crates/datadog-metrics-collector/Cargo.toml | 2 +- .../datadog-metrics-collector/src/instance.rs | 4 +-- crates/datadog-serverless-compat/src/main.rs | 2 +- 4 files changed, 5 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4d1881..ecd4bf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -492,7 +492,7 @@ name = "datadog-metrics-collector" version = "0.1.0" dependencies = [ "dogstatsd", - "libdd-common 1.1.0", + "libdd-common 3.0.1", "tracing", ] @@ -1463,35 +1463,6 @@ dependencies = [ "libdd-common 4.0.0", ] -[[package]] -name = "libdd-common" -version = "1.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d52ee90209cb12a28bdda0114535c1a985a29d95#d52ee90209cb12a28bdda0114535c1a985a29d95" -dependencies = [ - "anyhow", - "cc", - "const_format", - "futures", - "futures-core", - "futures-util", - "hex", - "http", - "http-body", - "http-body-util", - "hyper", - "hyper-util", - "libc", - "nix", - "pin-project", - "regex", - "serde", - "static_assertions", - "thiserror 1.0.69", - "tokio", - "tower-service", - "windows-sys 0.52.0", -] - [[package]] name = "libdd-common" version = "2.0.1" diff --git a/crates/datadog-metrics-collector/Cargo.toml b/crates/datadog-metrics-collector/Cargo.toml index 0153e63..09bac29 100644 --- a/crates/datadog-metrics-collector/Cargo.toml +++ b/crates/datadog-metrics-collector/Cargo.toml @@ -8,7 +8,7 @@ description = "Collector to read, compute, and submit enhanced metrics in Server [dependencies] dogstatsd = { path = "../dogstatsd", default-features = true } tracing = { version = "0.1", default-features = false } -libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95", default-features = false } +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", default-features = false } [features] windows-enhanced-metrics = [] diff --git a/crates/datadog-metrics-collector/src/instance.rs b/crates/datadog-metrics-collector/src/instance.rs index 848e862..42aa5ce 100644 --- a/crates/datadog-metrics-collector/src/instance.rs +++ b/crates/datadog-metrics-collector/src/instance.rs @@ -13,7 +13,7 @@ use dogstatsd::aggregator::AggregatorHandle; use dogstatsd::metric::{Metric, MetricValue, SortedTags}; use std::env; -use tracing::{debug, error, info}; +use tracing::{debug, error}; const INSTANCE_METRIC: &str = "azure.functions.enhanced.instance"; @@ -53,7 +53,7 @@ impl InstanceMetricsCollector { pub fn new(aggregator: AggregatorHandle, tags: Option) -> Self { let instance_id = resolve_instance_id(); if let Some(ref id) = instance_id { - info!("Instance ID resolved: {}", id); + debug!("Instance ID resolved: {}", id); } else { debug!("No instance ID found, instance metric will not be submitted"); } diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 947a79e..c7d440c 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -46,7 +46,7 @@ use dogstatsd::metric::{EMPTY_TAGS, SortedTags}; use tokio_util::sync::CancellationToken; const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; -const ENHANCED_METRICS_COLLECTION_INTERVAL_SECS: u64 = 10; +const ENHANCED_METRICS_COLLECTION_INTERVAL_SECS: u64 = 1; const DOGSTATSD_TIMEOUT_DURATION: Duration = Duration::from_secs(5); const DEFAULT_DOGSTATSD_PORT: u16 = 8125; const DEFAULT_LOG_INTAKE_PORT: u16 = 10517; From dcb7f1e55dd6fb4bc9fed2b8a734bec43611167b Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Tue, 14 Apr 2026 17:00:40 -0400 Subject: [PATCH 06/20] Change instance metric collection interval to 3, update comments --- Cargo.lock | 31 +++++++++++ .../datadog-metrics-collector/src/instance.rs | 14 ++--- crates/datadog-serverless-compat/src/main.rs | 17 +++--- crates/dogstatsd/src/origin.rs | 52 +++++++++---------- 4 files changed, 70 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ecd4bf6..b27d633 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -549,6 +549,7 @@ version = "0.1.0" dependencies = [ "datadog-fips", "datadog-logs-agent", + "datadog-metrics-collector", "datadog-trace-agent", "dogstatsd", "libdd-trace-utils 3.0.1", @@ -1494,6 +1495,36 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "libdd-common" +version = "3.0.1" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +dependencies = [ + "anyhow", + "bytes", + "cc", + "const_format", + "futures", + "futures-core", + "futures-util", + "hex", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "libc", + "nix", + "pin-project", + "regex", + "serde", + "static_assertions", + "thiserror 1.0.69", + "tokio", + "tower-service", + "windows-sys 0.52.0", +] + [[package]] name = "libdd-common" version = "4.0.0" diff --git a/crates/datadog-metrics-collector/src/instance.rs b/crates/datadog-metrics-collector/src/instance.rs index 42aa5ce..84fd701 100644 --- a/crates/datadog-metrics-collector/src/instance.rs +++ b/crates/datadog-metrics-collector/src/instance.rs @@ -4,11 +4,7 @@ //! Instance identity metric collector for Azure Functions. //! //! Submits `azure.functions.enhanced.instance` with value 1.0 on each -//! collection tick, tagged with the instance identifier. The env var -//! checked depends on the Azure plan type: -//! -//! - Elastic Premium / Premium: `WEBSITE_INSTANCE_ID` -//! - Flex Consumption / Consumption: `WEBSITE_POD_NAME` or `CONTAINER_NAME` +//! collection tick, tagged with the instance identifier. use dogstatsd::aggregator::AggregatorHandle; use dogstatsd::metric::{Metric, MetricValue, SortedTags}; @@ -33,8 +29,8 @@ fn resolve_instance_id_from( /// /// Checks in order: /// 1. `WEBSITE_INSTANCE_ID` (Elastic Premium / Premium plans) -/// 2. `WEBSITE_POD_NAME` (Flex Consumption plans) -/// 3. `CONTAINER_NAME` (Consumption plans) +/// 2. `WEBSITE_POD_NAME` (Flex Consumption / Consumption plans) +/// 3. `CONTAINER_NAME` (Flex Consumption / Consumption plans) fn resolve_instance_id() -> Option { resolve_instance_id_from( env::var("WEBSITE_INSTANCE_ID").ok().as_deref(), @@ -52,9 +48,7 @@ pub struct InstanceMetricsCollector { impl InstanceMetricsCollector { pub fn new(aggregator: AggregatorHandle, tags: Option) -> Self { let instance_id = resolve_instance_id(); - if let Some(ref id) = instance_id { - debug!("Instance ID resolved: {}", id); - } else { + if instance_id.is_none() { debug!("No instance ID found, instance metric will not be submitted"); } Self { diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index c7d440c..ff14f8e 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -46,7 +46,7 @@ use dogstatsd::metric::{EMPTY_TAGS, SortedTags}; use tokio_util::sync::CancellationToken; const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; -const ENHANCED_METRICS_COLLECTION_INTERVAL_SECS: u64 = 1; +const INSTANCE_METRICS_COLLECTION_INTERVAL_SECS: u64 = 3; const DOGSTATSD_TIMEOUT_DURATION: Duration = Duration::from_secs(5); const DEFAULT_DOGSTATSD_PORT: u16 = 8125; const DEFAULT_LOG_INTAKE_PORT: u16 = 10517; @@ -128,17 +128,11 @@ pub async fn main() { .and_then(|v| v.parse::().ok()) .unwrap_or(DEFAULT_LOG_INTAKE_PORT); - // Only enable enhanced metrics for Linux Azure Functions - #[cfg(not(feature = "windows-enhanced-metrics"))] let dd_enhanced_metrics = env_type == EnvironmentType::AzureFunction && env::var("DD_ENHANCED_METRICS_ENABLED") .map(|val| val.to_lowercase() != "false") .unwrap_or(true); - // Enhanced metrics are not yet supported in Windows environments - #[cfg(feature = "windows-enhanced-metrics")] - let dd_enhanced_metrics = false; - let dd_agent_stats_computation_enabled = env::var("DD_AGENT_STATS_COMPUTATION_ENABLED") .map(|val| val.to_lowercase() == "true") @@ -231,6 +225,7 @@ pub async fn main() { // The aggregator is shared between dogstatsd and enhanced metrics. // It is started independently so that either can be enabled without the other. + // Only dogstatsd needs the dogstatsd listener let (metrics_flusher, aggregator_handle) = if needs_aggregator { debug!("Creating metrics flusher and aggregator"); @@ -300,7 +295,7 @@ pub async fn main() { let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); let mut enhanced_metrics_collection_interval = interval(Duration::from_secs( - ENHANCED_METRICS_COLLECTION_INTERVAL_SECS, + INSTANCE_METRICS_COLLECTION_INTERVAL_SECS, )); flush_interval.tick().await; // discard first tick, which is instantaneous enhanced_metrics_collection_interval.tick().await; @@ -324,6 +319,9 @@ pub async fn main() { let retry_in = std::mem::take(&mut pending_log_retries); let failed = log_flusher.flush(retry_in).await; if !failed.is_empty() { + // TODO: surface flush failures into health/metrics telemetry so + // operators have a durable signal beyond log lines when logs are + // being dropped (e.g. increment a statsd counter or set a gauge). warn!( "log agent flush failed for {} batch(es); will retry next cycle", failed.len() @@ -347,6 +345,7 @@ async fn start_aggregator( https_proxy: Option, dogstatsd_tags: &str, ) -> (Option, AggregatorHandle) { + // Create the aggregator service #[allow(clippy::expect_used)] let (service, handle) = AggregatorService::new( SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), @@ -354,6 +353,7 @@ async fn start_aggregator( ) .expect("Failed to create aggregator service"); + // Start the aggregator service in the background tokio::spawn(service.run()); let metrics_flusher = match dd_api_key { @@ -424,6 +424,7 @@ async fn start_dogstatsd_listener( }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); + // Use handle in DogStatsD (cheap to clone) let dogstatsd_client = DogStatsD::new( &dogstatsd_config, handle.clone(), diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index 98e18c2..61705fd 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -300,7 +300,32 @@ mod tests { } #[test] - fn test_find_metric_origin_azure_functions() { + fn test_find_metric_origin_azure_functions_enhanced() { + let tags = SortedTags::parse("origin:azurefunction").unwrap(); + let metric = Metric { + id: 0, + name: "azure.functions.enhanced.instance".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::AzureFunctionsMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + + #[test] + fn test_find_metric_origin_azure_functions_custom() { let tags = SortedTags::parse("origin:azurefunction").unwrap(); let metric = Metric { id: 0, @@ -354,31 +379,6 @@ mod tests { assert_eq!(origin, None); } - #[test] - fn test_find_metric_origin_azure_functions_enhanced() { - let tags = SortedTags::parse("origin:azurefunction").unwrap(); - let metric = Metric { - id: 0, - name: "azure.functions.enhanced.instance".into(), - value: MetricValue::Gauge(1.0), - tags: Some(tags.clone()), - timestamp: 0, - }; - let origin = metric.find_origin(tags).unwrap(); - assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, - OriginCategory::AzureFunctionsMetrics as u32 - ); - assert_eq!( - origin.origin_service as u32, - OriginService::ServerlessEnhanced as u32 - ); - } - #[test] fn test_find_metric_origin_unknown() { let tags = SortedTags::parse("unknown:tag").unwrap(); From d5f12d42ed97c5957eb6b821a4587644b737d6ea Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Wed, 15 Apr 2026 10:27:07 -0400 Subject: [PATCH 07/20] Remove windows feature for now --- .github/workflows/build-datadog-serverless-compat.yml | 4 ++-- .github/workflows/cargo.yml | 2 +- crates/datadog-metrics-collector/Cargo.toml | 3 --- crates/datadog-serverless-compat/Cargo.toml | 1 - crates/datadog-serverless-compat/src/main.rs | 2 ++ 5 files changed, 5 insertions(+), 7 deletions(-) diff --git a/.github/workflows/build-datadog-serverless-compat.yml b/.github/workflows/build-datadog-serverless-compat.yml index 070938f..8c88500 100644 --- a/.github/workflows/build-datadog-serverless-compat.yml +++ b/.github/workflows/build-datadog-serverless-compat.yml @@ -56,7 +56,7 @@ jobs: retention-days: 3 - if: ${{ inputs.runner == 'windows-2022' }} shell: bash - run: cargo build --release -p datadog-serverless-compat --features windows-pipes,windows-enhanced-metrics + run: cargo build --release -p datadog-serverless-compat --features windows-pipes - if: ${{ inputs.runner == 'windows-2022' }} uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2 with: @@ -69,7 +69,7 @@ jobs: rustup target add i686-pc-windows-msvc cargo build --release -p datadog-serverless-compat \ --target i686-pc-windows-msvc \ - --features windows-pipes,windows-enhanced-metrics + --features windows-pipes - if: ${{ inputs.runner == 'windows-2022' }} uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2 with: diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index 7863afd..fc640c5 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -95,7 +95,7 @@ jobs: - shell: bash run: | if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then - cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes,datadog-serverless-compat/windows-enhanced-metrics + cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes else cargo nextest run --workspace fi diff --git a/crates/datadog-metrics-collector/Cargo.toml b/crates/datadog-metrics-collector/Cargo.toml index 09bac29..7e6d0b9 100644 --- a/crates/datadog-metrics-collector/Cargo.toml +++ b/crates/datadog-metrics-collector/Cargo.toml @@ -9,6 +9,3 @@ description = "Collector to read, compute, and submit enhanced metrics in Server dogstatsd = { path = "../dogstatsd", default-features = true } tracing = { version = "0.1", default-features = false } libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", default-features = false } - -[features] -windows-enhanced-metrics = [] diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index c33e114..83a794a 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -8,7 +8,6 @@ description = "Binary to run trace-agent and dogstatsd servers in Serverless env [features] default = [] windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"] -windows-enhanced-metrics = ["datadog-metrics-collector/windows-enhanced-metrics"] [dependencies] datadog-logs-agent = { path = "../datadog-logs-agent" } diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index ff14f8e..f3cb921 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -339,6 +339,8 @@ pub async fn main() { } } +/// Starts the metrics aggregator service and creates a flusher to send +/// aggregated metrics to the Datadog intake. async fn start_aggregator( dd_api_key: Option, dd_site: String, From a3b82c0f009a3bd389b36413a649a6195e50a44c Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Wed, 15 Apr 2026 12:36:45 -0400 Subject: [PATCH 08/20] Add precondition for enhanced metrics collector in tokio select loop --- crates/datadog-serverless-compat/src/main.rs | 6 +- .../2026-04-13-instance-enhanced-metric.md | 791 ++++++++++++++++++ 2 files changed, 794 insertions(+), 3 deletions(-) create mode 100644 docs/superpowers/plans/2026-04-13-instance-enhanced-metric.md diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index f3cb921..cae53c6 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -294,11 +294,11 @@ pub async fn main() { }; let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); - let mut enhanced_metrics_collection_interval = interval(Duration::from_secs( + let mut instance_metrics_collection_interval = interval(Duration::from_secs( INSTANCE_METRICS_COLLECTION_INTERVAL_SECS, )); flush_interval.tick().await; // discard first tick, which is instantaneous - enhanced_metrics_collection_interval.tick().await; + instance_metrics_collection_interval.tick().await; // Builders for log batches that failed transiently in the previous flush // cycle. They are redriven on the next cycle before new batches are sent. @@ -330,7 +330,7 @@ pub async fn main() { } } } - _ = enhanced_metrics_collection_interval.tick() => { + _ = instance_metrics_collection_interval.tick(), if instance_collector.is_some() => { if let Some(ref collector) = instance_collector { collector.collect_and_submit(); } diff --git a/docs/superpowers/plans/2026-04-13-instance-enhanced-metric.md b/docs/superpowers/plans/2026-04-13-instance-enhanced-metric.md new file mode 100644 index 0000000..f348264 --- /dev/null +++ b/docs/superpowers/plans/2026-04-13-instance-enhanced-metric.md @@ -0,0 +1,791 @@ +# Instance Enhanced Metric Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add an `azure.functions.enhanced.instance` metric that reports the Azure Functions instance identity, enabling per-instance observability. + +**Architecture:** Create the `datadog-metrics-collector` crate (ported from the CPU metrics branch scaffolding, minus CPU-specific code). The crate exposes an `InstanceMetricsCollector` that reads instance identity from env vars (`WEBSITE_INSTANCE_ID`, `WEBSITE_POD_NAME`, `CONTAINER_NAME`) and submits a **gauge** metric with value `1.0` on each collection tick, with the instance ID as an `instance_id` tag. This follows the datadog-agent pattern (PR 47421) where usage/instance metrics are gauges (not distributions) because the instance tag already provides a unique identifier, avoiding aggregation issues. The collector is wired into `datadog-serverless-compat`'s main loop via a `tokio::select!` arm, sharing the existing DogStatsD aggregator. Origin classification in `dogstatsd/src/origin.rs` needs the `azure.functions` prefix added to route instance metrics as `ServerlessEnhanced`. + +**Tech Stack:** Rust, tokio, dogstatsd crate (local), libdd-common (libdatadog) + +--- + +### Task 1: Create `datadog-metrics-collector` crate with shared tag builder + +**Files:** +- Create: `crates/datadog-metrics-collector/Cargo.toml` +- Create: `crates/datadog-metrics-collector/src/lib.rs` + +This task creates the crate shell. No metric logic yet. The workspace `Cargo.toml` uses `crates/*` glob so no workspace edit is needed. + +- [ ] **Step 1: Create `Cargo.toml`** + +```toml +[package] +name = "datadog-metrics-collector" +version = "0.1.0" +edition.workspace = true +license.workspace = true +description = "Collector to read, compute, and submit enhanced metrics in Serverless environments" + +[dependencies] +dogstatsd = { path = "../dogstatsd", default-features = true } +tracing = { version = "0.1", default-features = false } +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95", default-features = false } + +[features] +windows-enhanced-metrics = [] +``` + +Note: `num_cpus` is intentionally omitted — it's only needed for CPU metrics, not instance metrics. + +- [ ] **Step 2: Create `lib.rs`** + +```rust +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(test), deny(clippy::panic))] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] +#![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::todo))] +#![cfg_attr(not(test), deny(clippy::unimplemented))] + +pub mod instance; +pub mod tags; +``` + +- [ ] **Step 3: Verify the crate compiles** + +Run: `cargo check -p datadog-metrics-collector` +Expected: success (will fail until Task 2 and 3 create the modules) + +Note: This step will be verified after Tasks 2 and 3 are done. + +- [ ] **Step 4: Commit** + +```bash +git add crates/datadog-metrics-collector/Cargo.toml crates/datadog-metrics-collector/src/lib.rs +git commit -m "feat(metrics-collector): create datadog-metrics-collector crate shell" +``` + +--- + +### Task 2: Extract shared tag builder into `tags.rs` + +**Files:** +- Create: `crates/datadog-metrics-collector/src/tags.rs` + +The tag builder is lifted from `cpu.rs` on the CPU branch. It's shared infrastructure for all enhanced metrics (instance, CPU, memory, etc.), so it lives in its own module. + +- [ ] **Step 1: Create `tags.rs`** + +```rust +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Shared tag builder for enhanced metrics. +//! +//! Tags are attached to all enhanced metrics submitted by the metrics collector. + +use dogstatsd::metric::SortedTags; +use libdd_common::azure_app_services; +use std::env; + +/// Builds the common tags for all enhanced metrics. +/// +/// Sources: +/// - Azure metadata (resource_group, subscription_id, name) from libdd_common +/// - Environment variables (region, plan_tier, service, env, version, serverless_compat_version) +/// +/// The DogStatsD origin tag (e.g. `origin:azurefunction`) is added by the aggregator, +/// not here. +pub fn build_enhanced_metrics_tags() -> Option { + let mut tag_parts = Vec::new(); + + if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION { + let aas_tags = [ + ("resource_group", aas_metadata.get_resource_group()), + ("subscription_id", aas_metadata.get_subscription_id()), + ("name", aas_metadata.get_site_name()), + ]; + for (name, value) in aas_tags { + if value != "unknown" { + tag_parts.push(format!("{}:{}", name, value)); + } + } + } + + for (tag_name, env_var) in [ + ("region", "REGION_NAME"), + ("plan_tier", "WEBSITE_SKU"), + ("service", "DD_SERVICE"), + ("env", "DD_ENV"), + ("version", "DD_VERSION"), + ("serverless_compat_version", "DD_SERVERLESS_COMPAT_VERSION"), + ] { + if let Ok(val) = env::var(env_var) + && !val.is_empty() + { + tag_parts.push(format!("{}:{}", tag_name, val)); + } + } + + if tag_parts.is_empty() { + return None; + } + SortedTags::parse(&tag_parts.join(",")).ok() +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add crates/datadog-metrics-collector/src/tags.rs +git commit -m "feat(metrics-collector): add shared tag builder for enhanced metrics" +``` + +--- + +### Task 3: Implement `InstanceMetricsCollector` + +**Files:** +- Create: `crates/datadog-metrics-collector/src/instance.rs` + +The instance metric is simple: read the instance ID from env vars, submit `azure.functions.enhanced.instance` as a **gauge** with value `1.0` and an `instance_id` tag. Following the datadog-agent pattern (PR 47421), usage/instance metrics use gauges because the instance tag provides a unique identifier — no aggregation issues like CPU metrics have. No delta computation, no OS-specific reader. + +- [ ] **Step 1: Write failing test for `resolve_instance_id`** + +Create `crates/datadog-metrics-collector/src/instance.rs` with the test first: + +```rust +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Instance identity metric collector for Azure Functions. +//! +//! Submits `azure.functions.enhanced.instance` with value 1.0 on each +//! collection tick, tagged with the instance identifier. The env var +//! checked depends on the Azure plan type: +//! +//! - Elastic Premium / Premium: `WEBSITE_INSTANCE_ID` +//! - Flex Consumption / Consumption: `WEBSITE_POD_NAME` or `CONTAINER_NAME` + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_resolve_instance_id_returns_none_when_no_env_vars() { + // Ensure none of the vars are set (they shouldn't be in test env) + let id = resolve_instance_id_from(None, None, None); + assert!(id.is_none()); + } + + #[test] + fn test_resolve_instance_id_prefers_website_instance_id() { + let id = resolve_instance_id_from( + Some("instance-abc"), + Some("pod-xyz"), + Some("container-123"), + ); + assert_eq!(id, Some("instance-abc".to_string())); + } + + #[test] + fn test_resolve_instance_id_falls_back_to_pod_name() { + let id = resolve_instance_id_from(None, Some("pod-xyz"), Some("container-123")); + assert_eq!(id, Some("pod-xyz".to_string())); + } + + #[test] + fn test_resolve_instance_id_falls_back_to_container_name() { + let id = resolve_instance_id_from(None, None, Some("container-123")); + assert_eq!(id, Some("container-123".to_string())); + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cargo test -p datadog-metrics-collector -- test_resolve_instance_id 2>&1` +Expected: FAIL — `resolve_instance_id_from` not found + +- [ ] **Step 3: Implement `resolve_instance_id_from` and `resolve_instance_id`** + +Add above the `#[cfg(test)]` block: + +```rust +use dogstatsd::aggregator::AggregatorHandle; +use dogstatsd::metric::{Metric, MetricValue, SortedTags}; +use std::env; +use tracing::{debug, error, info}; + +const INSTANCE_METRIC: &str = "azure.functions.enhanced.instance"; + +/// Resolves the instance ID from explicit values (used by tests). +fn resolve_instance_id_from( + website_instance_id: Option<&str>, + website_pod_name: Option<&str>, + container_name: Option<&str>, +) -> Option { + website_instance_id + .or(website_pod_name) + .or(container_name) + .map(String::from) +} + +/// Resolves the instance ID from environment variables. +/// +/// Checks in order: +/// 1. `WEBSITE_INSTANCE_ID` (Elastic Premium / Premium plans) +/// 2. `WEBSITE_POD_NAME` (Flex Consumption plans) +/// 3. `CONTAINER_NAME` (Consumption plans) +fn resolve_instance_id() -> Option { + resolve_instance_id_from( + env::var("WEBSITE_INSTANCE_ID").ok().as_deref(), + env::var("WEBSITE_POD_NAME").ok().as_deref(), + env::var("CONTAINER_NAME").ok().as_deref(), + ) +} + +pub struct InstanceMetricsCollector { + aggregator: AggregatorHandle, + tags: Option, + instance_id: Option, +} + +impl InstanceMetricsCollector { + pub fn new(aggregator: AggregatorHandle, tags: Option) -> Self { + let instance_id = resolve_instance_id(); + if let Some(ref id) = instance_id { + info!("Instance ID resolved: {}", id); + } else { + debug!("No instance ID found, instance metric will not be submitted"); + } + Self { + aggregator, + tags, + instance_id, + } + } + + pub fn collect_and_submit(&self) { + let Some(ref instance_id) = self.instance_id else { + debug!("No instance ID available, skipping instance metric"); + return; + }; + + // Build tags: start with shared tags, add instance_id + let instance_tag = format!("instance_id:{}", instance_id); + let tag_string = match &self.tags { + Some(existing) => format!("{},{}", existing, instance_tag), + None => instance_tag, + }; + let tags = SortedTags::parse(&tag_string).ok(); + + let now = std::time::UNIX_EPOCH + .elapsed() + .map(|d| d.as_secs()) + .unwrap_or(0) + .try_into() + .unwrap_or(0); + + let metric = Metric::new( + INSTANCE_METRIC.into(), + MetricValue::gauge(1.0), + tags, + Some(now), + ); + + if let Err(e) = self.aggregator.insert_batch(vec![metric]) { + error!("Failed to insert instance metric: {}", e); + } + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cargo test -p datadog-metrics-collector -- test_resolve_instance_id` +Expected: all 4 tests PASS + +- [ ] **Step 5: Verify crate compiles** + +Run: `cargo check -p datadog-metrics-collector` +Expected: success + +- [ ] **Step 6: Commit** + +```bash +git add crates/datadog-metrics-collector/src/instance.rs +git commit -m "feat(metrics-collector): add instance identity metric collector" +``` + +--- + +### Task 4: Add `azure.functions` prefix to origin classification + +**Files:** +- Modify: `crates/dogstatsd/src/origin.rs` + +The current `main` branch doesn't include `azure.functions` in the enhanced-service prefix check. The CPU branch added it. We need it so `azure.functions.enhanced.instance` gets classified as `ServerlessEnhanced`. + +- [ ] **Step 1: Write failing test for the new origin classification** + +Add this test to the `mod tests` block at the bottom of `crates/dogstatsd/src/origin.rs`: + +```rust + #[test] + fn test_find_metric_origin_azure_functions_enhanced() { + let tags = SortedTags::parse("origin:azurefunction").unwrap(); + let metric = Metric { + id: 0, + name: "azure.functions.enhanced.instance".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::AzureFunctionsMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cargo test -p dogstatsd -- test_find_metric_origin_azure_functions_enhanced` +Expected: FAIL — `azure.functions` prefix not matched for `ServerlessEnhanced`, falls through to `ServerlessCustom` + +- [ ] **Step 3: Add `azure.functions` prefix constant and update matching** + +In `crates/dogstatsd/src/origin.rs`, add the constant: + +```rust +const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; +``` + +And update the service matching in `find_origin` to include it: + +```rust + } else if metric_prefix == AWS_LAMBDA_PREFIX + || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX + || metric_prefix == AZURE_FUNCTIONS_PREFIX + { +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cargo test -p dogstatsd -- test_find_metric_origin_azure_functions_enhanced` +Expected: PASS + +- [ ] **Step 5: Run all dogstatsd tests to check for regressions** + +Run: `cargo test -p dogstatsd` +Expected: all tests pass + +- [ ] **Step 6: Commit** + +```bash +git add crates/dogstatsd/src/origin.rs +git commit -m "feat(dogstatsd): classify azure.functions prefix as ServerlessEnhanced origin" +``` + +--- + +### Task 5: Wire `InstanceMetricsCollector` into `main.rs` + +**Files:** +- Modify: `crates/datadog-serverless-compat/Cargo.toml` +- Modify: `crates/datadog-serverless-compat/src/main.rs` + +This is the integration task. The main changes to `main.rs`: +1. Add `DD_ENHANCED_METRICS_ENABLED` env var check (Azure Functions only, default true, disabled on Windows) +2. Refactor aggregator creation to be shared between dogstatsd and enhanced metrics (same pattern as CPU branch) +3. Add a `tokio::select!` loop with separate flush and collection intervals +4. Create and run the `InstanceMetricsCollector` + +- [ ] **Step 1: Add `datadog-metrics-collector` dependency to `Cargo.toml`** + +Add to `[dependencies]` in `crates/datadog-serverless-compat/Cargo.toml`: + +```toml +datadog-metrics-collector = { path = "../datadog-metrics-collector" } +``` + +Add to `[features]`: + +```toml +windows-enhanced-metrics = ["datadog-metrics-collector/windows-enhanced-metrics"] +``` + +- [ ] **Step 2: Update `main.rs` — add import and collection interval constant** + +Add import near the top with other use statements: + +```rust +use datadog_metrics_collector::instance::InstanceMetricsCollector; +``` + +Add constant: + +```rust +const ENHANCED_METRICS_COLLECTION_INTERVAL_SECS: u64 = 10; +``` + +- [ ] **Step 3: Update `main.rs` — add `dd_enhanced_metrics` env var check** + +After the `dd_logs_enabled` / `dd_logs_port` env var reads (around line 121), add: + +```rust + // Only enable enhanced metrics for Linux Azure Functions + #[cfg(not(feature = "windows-enhanced-metrics"))] + let dd_enhanced_metrics = env_type == EnvironmentType::AzureFunction + && env::var("DD_ENHANCED_METRICS_ENABLED") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); + + // Enhanced metrics are not yet supported in Windows environments + #[cfg(feature = "windows-enhanced-metrics")] + let dd_enhanced_metrics = false; +``` + +- [ ] **Step 4: Update `main.rs` — refactor aggregator to be shared** + +Replace the dogstatsd startup block (lines 185-207) and the code down through the flush loop with the shared-aggregator pattern from the CPU branch. The key structural change is: + +1. Create aggregator when `dd_use_dogstatsd || dd_enhanced_metrics` +2. Start DogStatsD listener separately (only if `dd_use_dogstatsd`) +3. Create `InstanceMetricsCollector` when enhanced metrics enabled +4. Use `tokio::select!` with both flush and collection intervals + +Replace lines 185-207 (the dogstatsd startup block) with: + +```rust + let needs_aggregator = dd_use_dogstatsd || dd_enhanced_metrics; + + // The aggregator is shared between dogstatsd and enhanced metrics. + // It is started independently so that either can be enabled without the other. + let (metrics_flusher, aggregator_handle) = if needs_aggregator { + debug!("Creating metrics flusher and aggregator"); + + let (flusher, handle) = + start_aggregator(dd_api_key.clone(), dd_site, https_proxy.clone(), dogstatsd_tags).await; + + if dd_use_dogstatsd { + debug!("Starting dogstatsd"); + let _ = start_dogstatsd_listener( + dd_dogstatsd_port, + handle.clone(), + dd_statsd_metric_namespace, + #[cfg(all(windows, feature = "windows-pipes"))] + dd_dogstatsd_windows_pipe_name.clone(), + ) + .await; + if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { + info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + } else { + info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + } + } else { + info!("dogstatsd disabled"); + } + (flusher, Some(handle)) + } else { + info!("dogstatsd and enhanced metrics disabled"); + (None, None) + }; +``` + +- [ ] **Step 5: Update `main.rs` — create instance collector** + +After the aggregator block, add: + +```rust + let instance_collector = if dd_enhanced_metrics && metrics_flusher.is_some() { + aggregator_handle.as_ref().map(|handle| { + let tags = datadog_metrics_collector::tags::build_enhanced_metrics_tags(); + InstanceMetricsCollector::new(handle.clone(), tags) + }) + } else { + if !dd_enhanced_metrics { + info!("Enhanced metrics disabled"); + } else { + info!("Enhanced metrics enabled but metrics flusher not found"); + } + None + }; +``` + +- [ ] **Step 6: Update `main.rs` — replace flush loop with `tokio::select!`** + +Replace the existing flush loop (from `let mut flush_interval` through end of `loop`) with: + +```rust + let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); + let mut enhanced_metrics_collection_interval = + interval(Duration::from_secs(ENHANCED_METRICS_COLLECTION_INTERVAL_SECS)); + flush_interval.tick().await; // discard first tick, which is instantaneous + enhanced_metrics_collection_interval.tick().await; + + // Builders for log batches that failed transiently in the previous flush + // cycle. They are redriven on the next cycle before new batches are sent. + let mut pending_log_retries: Vec = Vec::new(); + + loop { + tokio::select! { + _ = flush_interval.tick() => { + if let Some(metrics_flusher) = metrics_flusher.clone() { + debug!("Flushing dogstatsd metrics"); + tokio::spawn(async move { + metrics_flusher.flush().await; + }); + } + + if let Some(log_flusher) = log_flusher.as_ref() { + debug!("Flushing log agent"); + let retry_in = std::mem::take(&mut pending_log_retries); + let failed = log_flusher.flush(retry_in).await; + if !failed.is_empty() { + warn!( + "log agent flush failed for {} batch(es); will retry next cycle", + failed.len() + ); + pending_log_retries = failed; + } + } + } + _ = enhanced_metrics_collection_interval.tick() => { + if let Some(ref collector) = instance_collector { + collector.collect_and_submit(); + } + } + } + } +``` + +- [ ] **Step 7: Update `main.rs` — refactor `start_dogstatsd` into `start_aggregator` + `start_dogstatsd_listener`** + +Replace the existing `start_dogstatsd` function with two functions: + +```rust +async fn start_aggregator( + dd_api_key: Option, + dd_site: String, + https_proxy: Option, + dogstatsd_tags: &str, +) -> (Option, AggregatorHandle) { + #[allow(clippy::expect_used)] + let (service, handle) = AggregatorService::new( + SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), + CONTEXTS, + ) + .expect("Failed to create aggregator service"); + + tokio::spawn(service.run()); + + let metrics_flusher = match dd_api_key { + Some(dd_api_key) => { + let client = match build_metrics_client(https_proxy, DOGSTATSD_TIMEOUT_DURATION) { + Ok(client) => client, + Err(e) => { + error!("Failed to build HTTP client: {e}, won't flush metrics"); + return (None, handle); + } + }; + let metrics_intake_url_prefix = match Site::new(dd_site) + .map_err(|e| e.to_string()) + .and_then(|site| { + MetricsIntakeUrlPrefix::new(Some(site), None).map_err(|e| e.to_string()) + }) { + Ok(prefix) => prefix, + Err(e) => { + error!("Failed to create metrics intake URL: {e}, won't flush metrics"); + return (None, handle); + } + }; + + let metrics_flusher = Flusher::new(FlusherConfig { + api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)), + aggregator_handle: handle.clone(), + metrics_intake_url_prefix, + client, + retry_strategy: RetryStrategy::LinearBackoff(3, 1), + compression_level: CompressionLevel::try_from(6).unwrap_or_default(), + }); + Some(metrics_flusher) + } + None => { + error!("DD_API_KEY not set, won't flush metrics"); + None + } + }; + + (metrics_flusher, handle) +} + +async fn start_dogstatsd_listener( + port: u16, + handle: AggregatorHandle, + metric_namespace: Option, + #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, +) -> CancellationToken { + #[cfg(all(windows, feature = "windows-pipes"))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + windows_pipe_name, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + + #[cfg(not(all(windows, feature = "windows-pipes")))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); + + let dogstatsd_client = DogStatsD::new( + &dogstatsd_config, + handle.clone(), + dogstatsd_cancel_token.clone(), + ) + .await; + + tokio::spawn(async move { + dogstatsd_client.spin().await; + }); + + dogstatsd_cancel_token +} +``` + +- [ ] **Step 8: Remove unused import `warn` if no longer needed, clean up** + +Check if `warn` is still used (it is, for log flush failures). Remove the `_aggregator_handle` underscore-prefixed variable since it's now used. + +- [ ] **Step 9: Verify everything compiles** + +Run: `cargo check -p datadog-serverless-compat` +Expected: success + +- [ ] **Step 10: Run all workspace tests** + +Run: `cargo test --workspace` +Expected: all tests pass + +- [ ] **Step 11: Commit** + +```bash +git add crates/datadog-serverless-compat/Cargo.toml crates/datadog-serverless-compat/src/main.rs +git commit -m "feat(serverless-compat): wire instance metrics collector into main loop" +``` + +--- + +### Task 6: Update CI workflows for `windows-enhanced-metrics` feature + +**Files:** +- Modify: `.github/workflows/build-datadog-serverless-compat.yml` +- Modify: `.github/workflows/cargo.yml` + +Windows builds need the `windows-enhanced-metrics` feature flag so that the feature-gated code compiles correctly. + +- [ ] **Step 1: Update `build-datadog-serverless-compat.yml`** + +Change the Windows build command from: + +```yaml +run: cargo build --release -p datadog-serverless-compat --features windows-pipes +``` + +to: + +```yaml +run: cargo build --release -p datadog-serverless-compat --features windows-pipes,windows-enhanced-metrics +``` + +- [ ] **Step 2: Update `cargo.yml`** + +Change the Windows test command from: + +```yaml +cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes +``` + +to: + +```yaml +cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes,datadog-serverless-compat/windows-enhanced-metrics +``` + +- [ ] **Step 3: Commit** + +```bash +git add .github/workflows/build-datadog-serverless-compat.yml .github/workflows/cargo.yml +git commit -m "ci: add windows-enhanced-metrics feature flag to CI builds" +``` + +--- + +### Task 7: Update `Cargo.lock` and `LICENSE-3rdparty.csv` + +**Files:** +- Modify: `Cargo.lock` (auto-generated) +- Modify: `LICENSE-3rdparty.csv` (if any new third-party deps) + +- [ ] **Step 1: Generate lock file** + +Run: `cargo generate-lockfile` or just `cargo check` to update `Cargo.lock` + +- [ ] **Step 2: Check if LICENSE-3rdparty.csv needs updating** + +Since we're only adding local crate dependencies (dogstatsd, libdd-common which are already in the dependency tree), this likely needs no changes. Verify by checking if the CI has a license check step and whether it passes. + +- [ ] **Step 3: Commit if changed** + +```bash +git add Cargo.lock +git commit -m "chore: update Cargo.lock for datadog-metrics-collector" +``` + +--- + +### Task 8: Final verification + +- [ ] **Step 1: Run full workspace check** + +Run: `cargo check --workspace` +Expected: success + +- [ ] **Step 2: Run full workspace tests** + +Run: `cargo test --workspace` +Expected: all tests pass + +- [ ] **Step 3: Run clippy** + +Run: `cargo clippy --workspace -- -D warnings` +Expected: no warnings + +- [ ] **Step 4: Verify Windows feature flag compiles** + +Run: `cargo check -p datadog-serverless-compat --features windows-enhanced-metrics,windows-pipes` +Expected: success (will use stub code paths) From 45473cb038a08016ec1f9bedf479e19facbecba9 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Wed, 15 Apr 2026 13:21:04 -0400 Subject: [PATCH 09/20] Precompute tags in new() rather than building them in collect_and_submit(), change missing instance log to warn --- .../datadog-metrics-collector/src/instance.rs | 45 +- crates/datadog-serverless-compat/src/main.rs | 2 +- .../2026-04-13-instance-enhanced-metric.md | 791 ------------------ 3 files changed, 22 insertions(+), 816 deletions(-) delete mode 100644 docs/superpowers/plans/2026-04-13-instance-enhanced-metric.md diff --git a/crates/datadog-metrics-collector/src/instance.rs b/crates/datadog-metrics-collector/src/instance.rs index 84fd701..18f3a48 100644 --- a/crates/datadog-metrics-collector/src/instance.rs +++ b/crates/datadog-metrics-collector/src/instance.rs @@ -9,7 +9,7 @@ use dogstatsd::aggregator::AggregatorHandle; use dogstatsd::metric::{Metric, MetricValue, SortedTags}; use std::env; -use tracing::{debug, error}; +use tracing::{error, warn}; const INSTANCE_METRIC: &str = "azure.functions.enhanced.instance"; @@ -42,42 +42,39 @@ fn resolve_instance_id() -> Option { pub struct InstanceMetricsCollector { aggregator: AggregatorHandle, tags: Option, - instance_id: Option, } impl InstanceMetricsCollector { - pub fn new(aggregator: AggregatorHandle, tags: Option) -> Self { + /// Creates a new collector, returning `None` if no instance ID is found. + pub fn new(aggregator: AggregatorHandle, tags: Option) -> Option { let instance_id = resolve_instance_id(); - if instance_id.is_none() { - debug!("No instance ID found, instance metric will not be submitted"); - } - Self { - aggregator, - tags, - instance_id, - } - } - - pub fn collect_and_submit(&self) { - let Some(ref instance_id) = self.instance_id else { - debug!("No instance ID available, skipping instance metric"); - return; + let Some(instance_id) = instance_id else { + warn!("No instance ID found, instance metric will not be submitted"); + return None; }; - // Build tags: start with shared tags, add instance + // Precompute tags: enhanced metrics tags + instance tag let instance_tag = format!("instance:{}", instance_id); - let tags = match &self.tags { - Some(existing) => { - let mut combined = existing.clone(); + let tags = match tags { + Some(mut existing) => { if let Ok(id_tag) = SortedTags::parse(&instance_tag) { - combined.extend(&id_tag); + existing.extend(&id_tag); } - Some(combined) + Some(existing) } None => SortedTags::parse(&instance_tag).ok(), }; - let metric = Metric::new(INSTANCE_METRIC.into(), MetricValue::gauge(1.0), tags, None); + Some(Self { aggregator, tags }) + } + + pub fn collect_and_submit(&self) { + let metric = Metric::new( + INSTANCE_METRIC.into(), + MetricValue::gauge(1.0), + self.tags.clone(), + None, + ); if let Err(e) = self.aggregator.insert_batch(vec![metric]) { error!("Failed to insert instance metric: {}", e); diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index cae53c6..fc27df4 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -262,7 +262,7 @@ pub async fn main() { }; let instance_collector = if dd_enhanced_metrics && metrics_flusher.is_some() { - aggregator_handle.as_ref().map(|handle| { + aggregator_handle.as_ref().and_then(|handle| { let tags = datadog_metrics_collector::tags::build_enhanced_metrics_tags(); InstanceMetricsCollector::new(handle.clone(), tags) }) diff --git a/docs/superpowers/plans/2026-04-13-instance-enhanced-metric.md b/docs/superpowers/plans/2026-04-13-instance-enhanced-metric.md deleted file mode 100644 index f348264..0000000 --- a/docs/superpowers/plans/2026-04-13-instance-enhanced-metric.md +++ /dev/null @@ -1,791 +0,0 @@ -# Instance Enhanced Metric Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Add an `azure.functions.enhanced.instance` metric that reports the Azure Functions instance identity, enabling per-instance observability. - -**Architecture:** Create the `datadog-metrics-collector` crate (ported from the CPU metrics branch scaffolding, minus CPU-specific code). The crate exposes an `InstanceMetricsCollector` that reads instance identity from env vars (`WEBSITE_INSTANCE_ID`, `WEBSITE_POD_NAME`, `CONTAINER_NAME`) and submits a **gauge** metric with value `1.0` on each collection tick, with the instance ID as an `instance_id` tag. This follows the datadog-agent pattern (PR 47421) where usage/instance metrics are gauges (not distributions) because the instance tag already provides a unique identifier, avoiding aggregation issues. The collector is wired into `datadog-serverless-compat`'s main loop via a `tokio::select!` arm, sharing the existing DogStatsD aggregator. Origin classification in `dogstatsd/src/origin.rs` needs the `azure.functions` prefix added to route instance metrics as `ServerlessEnhanced`. - -**Tech Stack:** Rust, tokio, dogstatsd crate (local), libdd-common (libdatadog) - ---- - -### Task 1: Create `datadog-metrics-collector` crate with shared tag builder - -**Files:** -- Create: `crates/datadog-metrics-collector/Cargo.toml` -- Create: `crates/datadog-metrics-collector/src/lib.rs` - -This task creates the crate shell. No metric logic yet. The workspace `Cargo.toml` uses `crates/*` glob so no workspace edit is needed. - -- [ ] **Step 1: Create `Cargo.toml`** - -```toml -[package] -name = "datadog-metrics-collector" -version = "0.1.0" -edition.workspace = true -license.workspace = true -description = "Collector to read, compute, and submit enhanced metrics in Serverless environments" - -[dependencies] -dogstatsd = { path = "../dogstatsd", default-features = true } -tracing = { version = "0.1", default-features = false } -libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95", default-features = false } - -[features] -windows-enhanced-metrics = [] -``` - -Note: `num_cpus` is intentionally omitted — it's only needed for CPU metrics, not instance metrics. - -- [ ] **Step 2: Create `lib.rs`** - -```rust -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -#![cfg_attr(not(test), deny(clippy::panic))] -#![cfg_attr(not(test), deny(clippy::unwrap_used))] -#![cfg_attr(not(test), deny(clippy::expect_used))] -#![cfg_attr(not(test), deny(clippy::todo))] -#![cfg_attr(not(test), deny(clippy::unimplemented))] - -pub mod instance; -pub mod tags; -``` - -- [ ] **Step 3: Verify the crate compiles** - -Run: `cargo check -p datadog-metrics-collector` -Expected: success (will fail until Task 2 and 3 create the modules) - -Note: This step will be verified after Tasks 2 and 3 are done. - -- [ ] **Step 4: Commit** - -```bash -git add crates/datadog-metrics-collector/Cargo.toml crates/datadog-metrics-collector/src/lib.rs -git commit -m "feat(metrics-collector): create datadog-metrics-collector crate shell" -``` - ---- - -### Task 2: Extract shared tag builder into `tags.rs` - -**Files:** -- Create: `crates/datadog-metrics-collector/src/tags.rs` - -The tag builder is lifted from `cpu.rs` on the CPU branch. It's shared infrastructure for all enhanced metrics (instance, CPU, memory, etc.), so it lives in its own module. - -- [ ] **Step 1: Create `tags.rs`** - -```rust -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -//! Shared tag builder for enhanced metrics. -//! -//! Tags are attached to all enhanced metrics submitted by the metrics collector. - -use dogstatsd::metric::SortedTags; -use libdd_common::azure_app_services; -use std::env; - -/// Builds the common tags for all enhanced metrics. -/// -/// Sources: -/// - Azure metadata (resource_group, subscription_id, name) from libdd_common -/// - Environment variables (region, plan_tier, service, env, version, serverless_compat_version) -/// -/// The DogStatsD origin tag (e.g. `origin:azurefunction`) is added by the aggregator, -/// not here. -pub fn build_enhanced_metrics_tags() -> Option { - let mut tag_parts = Vec::new(); - - if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION { - let aas_tags = [ - ("resource_group", aas_metadata.get_resource_group()), - ("subscription_id", aas_metadata.get_subscription_id()), - ("name", aas_metadata.get_site_name()), - ]; - for (name, value) in aas_tags { - if value != "unknown" { - tag_parts.push(format!("{}:{}", name, value)); - } - } - } - - for (tag_name, env_var) in [ - ("region", "REGION_NAME"), - ("plan_tier", "WEBSITE_SKU"), - ("service", "DD_SERVICE"), - ("env", "DD_ENV"), - ("version", "DD_VERSION"), - ("serverless_compat_version", "DD_SERVERLESS_COMPAT_VERSION"), - ] { - if let Ok(val) = env::var(env_var) - && !val.is_empty() - { - tag_parts.push(format!("{}:{}", tag_name, val)); - } - } - - if tag_parts.is_empty() { - return None; - } - SortedTags::parse(&tag_parts.join(",")).ok() -} -``` - -- [ ] **Step 2: Commit** - -```bash -git add crates/datadog-metrics-collector/src/tags.rs -git commit -m "feat(metrics-collector): add shared tag builder for enhanced metrics" -``` - ---- - -### Task 3: Implement `InstanceMetricsCollector` - -**Files:** -- Create: `crates/datadog-metrics-collector/src/instance.rs` - -The instance metric is simple: read the instance ID from env vars, submit `azure.functions.enhanced.instance` as a **gauge** with value `1.0` and an `instance_id` tag. Following the datadog-agent pattern (PR 47421), usage/instance metrics use gauges because the instance tag provides a unique identifier — no aggregation issues like CPU metrics have. No delta computation, no OS-specific reader. - -- [ ] **Step 1: Write failing test for `resolve_instance_id`** - -Create `crates/datadog-metrics-collector/src/instance.rs` with the test first: - -```rust -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -//! Instance identity metric collector for Azure Functions. -//! -//! Submits `azure.functions.enhanced.instance` with value 1.0 on each -//! collection tick, tagged with the instance identifier. The env var -//! checked depends on the Azure plan type: -//! -//! - Elastic Premium / Premium: `WEBSITE_INSTANCE_ID` -//! - Flex Consumption / Consumption: `WEBSITE_POD_NAME` or `CONTAINER_NAME` - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_resolve_instance_id_returns_none_when_no_env_vars() { - // Ensure none of the vars are set (they shouldn't be in test env) - let id = resolve_instance_id_from(None, None, None); - assert!(id.is_none()); - } - - #[test] - fn test_resolve_instance_id_prefers_website_instance_id() { - let id = resolve_instance_id_from( - Some("instance-abc"), - Some("pod-xyz"), - Some("container-123"), - ); - assert_eq!(id, Some("instance-abc".to_string())); - } - - #[test] - fn test_resolve_instance_id_falls_back_to_pod_name() { - let id = resolve_instance_id_from(None, Some("pod-xyz"), Some("container-123")); - assert_eq!(id, Some("pod-xyz".to_string())); - } - - #[test] - fn test_resolve_instance_id_falls_back_to_container_name() { - let id = resolve_instance_id_from(None, None, Some("container-123")); - assert_eq!(id, Some("container-123".to_string())); - } -} -``` - -- [ ] **Step 2: Run test to verify it fails** - -Run: `cargo test -p datadog-metrics-collector -- test_resolve_instance_id 2>&1` -Expected: FAIL — `resolve_instance_id_from` not found - -- [ ] **Step 3: Implement `resolve_instance_id_from` and `resolve_instance_id`** - -Add above the `#[cfg(test)]` block: - -```rust -use dogstatsd::aggregator::AggregatorHandle; -use dogstatsd::metric::{Metric, MetricValue, SortedTags}; -use std::env; -use tracing::{debug, error, info}; - -const INSTANCE_METRIC: &str = "azure.functions.enhanced.instance"; - -/// Resolves the instance ID from explicit values (used by tests). -fn resolve_instance_id_from( - website_instance_id: Option<&str>, - website_pod_name: Option<&str>, - container_name: Option<&str>, -) -> Option { - website_instance_id - .or(website_pod_name) - .or(container_name) - .map(String::from) -} - -/// Resolves the instance ID from environment variables. -/// -/// Checks in order: -/// 1. `WEBSITE_INSTANCE_ID` (Elastic Premium / Premium plans) -/// 2. `WEBSITE_POD_NAME` (Flex Consumption plans) -/// 3. `CONTAINER_NAME` (Consumption plans) -fn resolve_instance_id() -> Option { - resolve_instance_id_from( - env::var("WEBSITE_INSTANCE_ID").ok().as_deref(), - env::var("WEBSITE_POD_NAME").ok().as_deref(), - env::var("CONTAINER_NAME").ok().as_deref(), - ) -} - -pub struct InstanceMetricsCollector { - aggregator: AggregatorHandle, - tags: Option, - instance_id: Option, -} - -impl InstanceMetricsCollector { - pub fn new(aggregator: AggregatorHandle, tags: Option) -> Self { - let instance_id = resolve_instance_id(); - if let Some(ref id) = instance_id { - info!("Instance ID resolved: {}", id); - } else { - debug!("No instance ID found, instance metric will not be submitted"); - } - Self { - aggregator, - tags, - instance_id, - } - } - - pub fn collect_and_submit(&self) { - let Some(ref instance_id) = self.instance_id else { - debug!("No instance ID available, skipping instance metric"); - return; - }; - - // Build tags: start with shared tags, add instance_id - let instance_tag = format!("instance_id:{}", instance_id); - let tag_string = match &self.tags { - Some(existing) => format!("{},{}", existing, instance_tag), - None => instance_tag, - }; - let tags = SortedTags::parse(&tag_string).ok(); - - let now = std::time::UNIX_EPOCH - .elapsed() - .map(|d| d.as_secs()) - .unwrap_or(0) - .try_into() - .unwrap_or(0); - - let metric = Metric::new( - INSTANCE_METRIC.into(), - MetricValue::gauge(1.0), - tags, - Some(now), - ); - - if let Err(e) = self.aggregator.insert_batch(vec![metric]) { - error!("Failed to insert instance metric: {}", e); - } - } -} -``` - -- [ ] **Step 4: Run tests to verify they pass** - -Run: `cargo test -p datadog-metrics-collector -- test_resolve_instance_id` -Expected: all 4 tests PASS - -- [ ] **Step 5: Verify crate compiles** - -Run: `cargo check -p datadog-metrics-collector` -Expected: success - -- [ ] **Step 6: Commit** - -```bash -git add crates/datadog-metrics-collector/src/instance.rs -git commit -m "feat(metrics-collector): add instance identity metric collector" -``` - ---- - -### Task 4: Add `azure.functions` prefix to origin classification - -**Files:** -- Modify: `crates/dogstatsd/src/origin.rs` - -The current `main` branch doesn't include `azure.functions` in the enhanced-service prefix check. The CPU branch added it. We need it so `azure.functions.enhanced.instance` gets classified as `ServerlessEnhanced`. - -- [ ] **Step 1: Write failing test for the new origin classification** - -Add this test to the `mod tests` block at the bottom of `crates/dogstatsd/src/origin.rs`: - -```rust - #[test] - fn test_find_metric_origin_azure_functions_enhanced() { - let tags = SortedTags::parse("origin:azurefunction").unwrap(); - let metric = Metric { - id: 0, - name: "azure.functions.enhanced.instance".into(), - value: MetricValue::Gauge(1.0), - tags: Some(tags.clone()), - timestamp: 0, - }; - let origin = metric.find_origin(tags).unwrap(); - assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, - OriginCategory::AzureFunctionsMetrics as u32 - ); - assert_eq!( - origin.origin_service as u32, - OriginService::ServerlessEnhanced as u32 - ); - } -``` - -- [ ] **Step 2: Run test to verify it fails** - -Run: `cargo test -p dogstatsd -- test_find_metric_origin_azure_functions_enhanced` -Expected: FAIL — `azure.functions` prefix not matched for `ServerlessEnhanced`, falls through to `ServerlessCustom` - -- [ ] **Step 3: Add `azure.functions` prefix constant and update matching** - -In `crates/dogstatsd/src/origin.rs`, add the constant: - -```rust -const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; -``` - -And update the service matching in `find_origin` to include it: - -```rust - } else if metric_prefix == AWS_LAMBDA_PREFIX - || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX - || metric_prefix == AZURE_FUNCTIONS_PREFIX - { -``` - -- [ ] **Step 4: Run test to verify it passes** - -Run: `cargo test -p dogstatsd -- test_find_metric_origin_azure_functions_enhanced` -Expected: PASS - -- [ ] **Step 5: Run all dogstatsd tests to check for regressions** - -Run: `cargo test -p dogstatsd` -Expected: all tests pass - -- [ ] **Step 6: Commit** - -```bash -git add crates/dogstatsd/src/origin.rs -git commit -m "feat(dogstatsd): classify azure.functions prefix as ServerlessEnhanced origin" -``` - ---- - -### Task 5: Wire `InstanceMetricsCollector` into `main.rs` - -**Files:** -- Modify: `crates/datadog-serverless-compat/Cargo.toml` -- Modify: `crates/datadog-serverless-compat/src/main.rs` - -This is the integration task. The main changes to `main.rs`: -1. Add `DD_ENHANCED_METRICS_ENABLED` env var check (Azure Functions only, default true, disabled on Windows) -2. Refactor aggregator creation to be shared between dogstatsd and enhanced metrics (same pattern as CPU branch) -3. Add a `tokio::select!` loop with separate flush and collection intervals -4. Create and run the `InstanceMetricsCollector` - -- [ ] **Step 1: Add `datadog-metrics-collector` dependency to `Cargo.toml`** - -Add to `[dependencies]` in `crates/datadog-serverless-compat/Cargo.toml`: - -```toml -datadog-metrics-collector = { path = "../datadog-metrics-collector" } -``` - -Add to `[features]`: - -```toml -windows-enhanced-metrics = ["datadog-metrics-collector/windows-enhanced-metrics"] -``` - -- [ ] **Step 2: Update `main.rs` — add import and collection interval constant** - -Add import near the top with other use statements: - -```rust -use datadog_metrics_collector::instance::InstanceMetricsCollector; -``` - -Add constant: - -```rust -const ENHANCED_METRICS_COLLECTION_INTERVAL_SECS: u64 = 10; -``` - -- [ ] **Step 3: Update `main.rs` — add `dd_enhanced_metrics` env var check** - -After the `dd_logs_enabled` / `dd_logs_port` env var reads (around line 121), add: - -```rust - // Only enable enhanced metrics for Linux Azure Functions - #[cfg(not(feature = "windows-enhanced-metrics"))] - let dd_enhanced_metrics = env_type == EnvironmentType::AzureFunction - && env::var("DD_ENHANCED_METRICS_ENABLED") - .map(|val| val.to_lowercase() != "false") - .unwrap_or(true); - - // Enhanced metrics are not yet supported in Windows environments - #[cfg(feature = "windows-enhanced-metrics")] - let dd_enhanced_metrics = false; -``` - -- [ ] **Step 4: Update `main.rs` — refactor aggregator to be shared** - -Replace the dogstatsd startup block (lines 185-207) and the code down through the flush loop with the shared-aggregator pattern from the CPU branch. The key structural change is: - -1. Create aggregator when `dd_use_dogstatsd || dd_enhanced_metrics` -2. Start DogStatsD listener separately (only if `dd_use_dogstatsd`) -3. Create `InstanceMetricsCollector` when enhanced metrics enabled -4. Use `tokio::select!` with both flush and collection intervals - -Replace lines 185-207 (the dogstatsd startup block) with: - -```rust - let needs_aggregator = dd_use_dogstatsd || dd_enhanced_metrics; - - // The aggregator is shared between dogstatsd and enhanced metrics. - // It is started independently so that either can be enabled without the other. - let (metrics_flusher, aggregator_handle) = if needs_aggregator { - debug!("Creating metrics flusher and aggregator"); - - let (flusher, handle) = - start_aggregator(dd_api_key.clone(), dd_site, https_proxy.clone(), dogstatsd_tags).await; - - if dd_use_dogstatsd { - debug!("Starting dogstatsd"); - let _ = start_dogstatsd_listener( - dd_dogstatsd_port, - handle.clone(), - dd_statsd_metric_namespace, - #[cfg(all(windows, feature = "windows-pipes"))] - dd_dogstatsd_windows_pipe_name.clone(), - ) - .await; - if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { - info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); - } else { - info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); - } - } else { - info!("dogstatsd disabled"); - } - (flusher, Some(handle)) - } else { - info!("dogstatsd and enhanced metrics disabled"); - (None, None) - }; -``` - -- [ ] **Step 5: Update `main.rs` — create instance collector** - -After the aggregator block, add: - -```rust - let instance_collector = if dd_enhanced_metrics && metrics_flusher.is_some() { - aggregator_handle.as_ref().map(|handle| { - let tags = datadog_metrics_collector::tags::build_enhanced_metrics_tags(); - InstanceMetricsCollector::new(handle.clone(), tags) - }) - } else { - if !dd_enhanced_metrics { - info!("Enhanced metrics disabled"); - } else { - info!("Enhanced metrics enabled but metrics flusher not found"); - } - None - }; -``` - -- [ ] **Step 6: Update `main.rs` — replace flush loop with `tokio::select!`** - -Replace the existing flush loop (from `let mut flush_interval` through end of `loop`) with: - -```rust - let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); - let mut enhanced_metrics_collection_interval = - interval(Duration::from_secs(ENHANCED_METRICS_COLLECTION_INTERVAL_SECS)); - flush_interval.tick().await; // discard first tick, which is instantaneous - enhanced_metrics_collection_interval.tick().await; - - // Builders for log batches that failed transiently in the previous flush - // cycle. They are redriven on the next cycle before new batches are sent. - let mut pending_log_retries: Vec = Vec::new(); - - loop { - tokio::select! { - _ = flush_interval.tick() => { - if let Some(metrics_flusher) = metrics_flusher.clone() { - debug!("Flushing dogstatsd metrics"); - tokio::spawn(async move { - metrics_flusher.flush().await; - }); - } - - if let Some(log_flusher) = log_flusher.as_ref() { - debug!("Flushing log agent"); - let retry_in = std::mem::take(&mut pending_log_retries); - let failed = log_flusher.flush(retry_in).await; - if !failed.is_empty() { - warn!( - "log agent flush failed for {} batch(es); will retry next cycle", - failed.len() - ); - pending_log_retries = failed; - } - } - } - _ = enhanced_metrics_collection_interval.tick() => { - if let Some(ref collector) = instance_collector { - collector.collect_and_submit(); - } - } - } - } -``` - -- [ ] **Step 7: Update `main.rs` — refactor `start_dogstatsd` into `start_aggregator` + `start_dogstatsd_listener`** - -Replace the existing `start_dogstatsd` function with two functions: - -```rust -async fn start_aggregator( - dd_api_key: Option, - dd_site: String, - https_proxy: Option, - dogstatsd_tags: &str, -) -> (Option, AggregatorHandle) { - #[allow(clippy::expect_used)] - let (service, handle) = AggregatorService::new( - SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), - CONTEXTS, - ) - .expect("Failed to create aggregator service"); - - tokio::spawn(service.run()); - - let metrics_flusher = match dd_api_key { - Some(dd_api_key) => { - let client = match build_metrics_client(https_proxy, DOGSTATSD_TIMEOUT_DURATION) { - Ok(client) => client, - Err(e) => { - error!("Failed to build HTTP client: {e}, won't flush metrics"); - return (None, handle); - } - }; - let metrics_intake_url_prefix = match Site::new(dd_site) - .map_err(|e| e.to_string()) - .and_then(|site| { - MetricsIntakeUrlPrefix::new(Some(site), None).map_err(|e| e.to_string()) - }) { - Ok(prefix) => prefix, - Err(e) => { - error!("Failed to create metrics intake URL: {e}, won't flush metrics"); - return (None, handle); - } - }; - - let metrics_flusher = Flusher::new(FlusherConfig { - api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)), - aggregator_handle: handle.clone(), - metrics_intake_url_prefix, - client, - retry_strategy: RetryStrategy::LinearBackoff(3, 1), - compression_level: CompressionLevel::try_from(6).unwrap_or_default(), - }); - Some(metrics_flusher) - } - None => { - error!("DD_API_KEY not set, won't flush metrics"); - None - } - }; - - (metrics_flusher, handle) -} - -async fn start_dogstatsd_listener( - port: u16, - handle: AggregatorHandle, - metric_namespace: Option, - #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, -) -> CancellationToken { - #[cfg(all(windows, feature = "windows-pipes"))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - windows_pipe_name, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - - #[cfg(not(all(windows, feature = "windows-pipes")))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); - - let dogstatsd_client = DogStatsD::new( - &dogstatsd_config, - handle.clone(), - dogstatsd_cancel_token.clone(), - ) - .await; - - tokio::spawn(async move { - dogstatsd_client.spin().await; - }); - - dogstatsd_cancel_token -} -``` - -- [ ] **Step 8: Remove unused import `warn` if no longer needed, clean up** - -Check if `warn` is still used (it is, for log flush failures). Remove the `_aggregator_handle` underscore-prefixed variable since it's now used. - -- [ ] **Step 9: Verify everything compiles** - -Run: `cargo check -p datadog-serverless-compat` -Expected: success - -- [ ] **Step 10: Run all workspace tests** - -Run: `cargo test --workspace` -Expected: all tests pass - -- [ ] **Step 11: Commit** - -```bash -git add crates/datadog-serverless-compat/Cargo.toml crates/datadog-serverless-compat/src/main.rs -git commit -m "feat(serverless-compat): wire instance metrics collector into main loop" -``` - ---- - -### Task 6: Update CI workflows for `windows-enhanced-metrics` feature - -**Files:** -- Modify: `.github/workflows/build-datadog-serverless-compat.yml` -- Modify: `.github/workflows/cargo.yml` - -Windows builds need the `windows-enhanced-metrics` feature flag so that the feature-gated code compiles correctly. - -- [ ] **Step 1: Update `build-datadog-serverless-compat.yml`** - -Change the Windows build command from: - -```yaml -run: cargo build --release -p datadog-serverless-compat --features windows-pipes -``` - -to: - -```yaml -run: cargo build --release -p datadog-serverless-compat --features windows-pipes,windows-enhanced-metrics -``` - -- [ ] **Step 2: Update `cargo.yml`** - -Change the Windows test command from: - -```yaml -cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes -``` - -to: - -```yaml -cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes,datadog-serverless-compat/windows-enhanced-metrics -``` - -- [ ] **Step 3: Commit** - -```bash -git add .github/workflows/build-datadog-serverless-compat.yml .github/workflows/cargo.yml -git commit -m "ci: add windows-enhanced-metrics feature flag to CI builds" -``` - ---- - -### Task 7: Update `Cargo.lock` and `LICENSE-3rdparty.csv` - -**Files:** -- Modify: `Cargo.lock` (auto-generated) -- Modify: `LICENSE-3rdparty.csv` (if any new third-party deps) - -- [ ] **Step 1: Generate lock file** - -Run: `cargo generate-lockfile` or just `cargo check` to update `Cargo.lock` - -- [ ] **Step 2: Check if LICENSE-3rdparty.csv needs updating** - -Since we're only adding local crate dependencies (dogstatsd, libdd-common which are already in the dependency tree), this likely needs no changes. Verify by checking if the CI has a license check step and whether it passes. - -- [ ] **Step 3: Commit if changed** - -```bash -git add Cargo.lock -git commit -m "chore: update Cargo.lock for datadog-metrics-collector" -``` - ---- - -### Task 8: Final verification - -- [ ] **Step 1: Run full workspace check** - -Run: `cargo check --workspace` -Expected: success - -- [ ] **Step 2: Run full workspace tests** - -Run: `cargo test --workspace` -Expected: all tests pass - -- [ ] **Step 3: Run clippy** - -Run: `cargo clippy --workspace -- -D warnings` -Expected: no warnings - -- [ ] **Step 4: Verify Windows feature flag compiles** - -Run: `cargo check -p datadog-serverless-compat --features windows-enhanced-metrics,windows-pipes` -Expected: success (will use stub code paths) From f3a2022e7ac5927a8d0d14dd9358db33d97be771 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Fri, 17 Apr 2026 11:33:02 -0400 Subject: [PATCH 10/20] Don't check DD_ENHANCED_METRICS_ENABLED --- crates/datadog-serverless-compat/src/main.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index fc27df4..0d300aa 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -128,10 +128,7 @@ pub async fn main() { .and_then(|v| v.parse::().ok()) .unwrap_or(DEFAULT_LOG_INTAKE_PORT); - let dd_enhanced_metrics = env_type == EnvironmentType::AzureFunction - && env::var("DD_ENHANCED_METRICS_ENABLED") - .map(|val| val.to_lowercase() != "false") - .unwrap_or(true); + let instance_metric_enabled = env_type == EnvironmentType::AzureFunction; let dd_agent_stats_computation_enabled = env::var("DD_AGENT_STATS_COMPUTATION_ENABLED") @@ -221,7 +218,7 @@ pub async fn main() { } }); - let needs_aggregator = dd_use_dogstatsd || dd_enhanced_metrics; + let needs_aggregator = dd_use_dogstatsd || instance_metric_enabled; // The aggregator is shared between dogstatsd and enhanced metrics. // It is started independently so that either can be enabled without the other. @@ -261,17 +258,12 @@ pub async fn main() { (None, None) }; - let instance_collector = if dd_enhanced_metrics && metrics_flusher.is_some() { + let instance_collector = if instance_metric_enabled && metrics_flusher.is_some() { aggregator_handle.as_ref().and_then(|handle| { let tags = datadog_metrics_collector::tags::build_enhanced_metrics_tags(); InstanceMetricsCollector::new(handle.clone(), tags) }) } else { - if !dd_enhanced_metrics { - info!("Enhanced metrics disabled"); - } else { - info!("Enhanced metrics enabled but metrics flusher not found"); - } None }; From 2e2c2fa26a5ca78e989f5b52cbd7926550f09a51 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Thu, 23 Apr 2026 14:27:00 -0400 Subject: [PATCH 11/20] Resolve instance ID based on hosting plan --- .../datadog-metrics-collector/src/instance.rs | 105 +++++++++++++++--- 1 file changed, 87 insertions(+), 18 deletions(-) diff --git a/crates/datadog-metrics-collector/src/instance.rs b/crates/datadog-metrics-collector/src/instance.rs index 18f3a48..7581514 100644 --- a/crates/datadog-metrics-collector/src/instance.rs +++ b/crates/datadog-metrics-collector/src/instance.rs @@ -14,28 +14,42 @@ use tracing::{error, warn}; const INSTANCE_METRIC: &str = "azure.functions.enhanced.instance"; /// Resolves the instance ID from explicit values (used by tests). +/// +/// Picks the env var that matches the Azure integration metric's `instance` +/// tag for the current hosting plan (via `WEBSITE_SKU`), with fallback logic +/// if the preferred var is empty. fn resolve_instance_id_from( - website_instance_id: Option<&str>, - website_pod_name: Option<&str>, + website_sku: Option<&str>, container_name: Option<&str>, + website_pod_name: Option<&str>, + computer_name: Option<&str>, ) -> Option { - website_instance_id - .or(website_pod_name) - .or(container_name) - .map(String::from) + fn non_empty(s: Option<&str>) -> Option<&str> { + s.filter(|v| !v.is_empty()) + } + + let sku_preferred = match website_sku { + Some("FlexConsumption") | Some("Dynamic") => { + non_empty(container_name).or(non_empty(website_pod_name)) + } + Some(_) => non_empty(computer_name), + None => None, + }; + + sku_preferred + .or_else(|| non_empty(container_name)) + .or_else(|| non_empty(website_pod_name)) + .or_else(|| non_empty(computer_name)) + .map(|s| s.to_lowercase()) } /// Resolves the instance ID from environment variables. -/// -/// Checks in order: -/// 1. `WEBSITE_INSTANCE_ID` (Elastic Premium / Premium plans) -/// 2. `WEBSITE_POD_NAME` (Flex Consumption / Consumption plans) -/// 3. `CONTAINER_NAME` (Flex Consumption / Consumption plans) fn resolve_instance_id() -> Option { resolve_instance_id_from( - env::var("WEBSITE_INSTANCE_ID").ok().as_deref(), - env::var("WEBSITE_POD_NAME").ok().as_deref(), + env::var("WEBSITE_SKU").ok().as_deref(), env::var("CONTAINER_NAME").ok().as_deref(), + env::var("WEBSITE_POD_NAME").ok().as_deref(), + env::var("COMPUTERNAME").ok().as_deref(), ) } @@ -87,14 +101,69 @@ mod tests { use super::*; #[test] - fn test_resolve_instance_id_falls_back_to_pod_name() { - let id = resolve_instance_id_from(None, Some("pod-xyz"), Some("container-123")); + fn test_flex_consumption_uses_container_name() { + let id = resolve_instance_id_from( + Some("FlexConsumption"), + Some("0--abc-DEF"), + Some("0--abc-DEF"), + None, + ); + assert_eq!(id, Some("0--abc-def".to_string())); + } + + #[test] + fn test_flex_consumption_falls_back_to_pod_name_if_container_missing() { + let id = resolve_instance_id_from(Some("FlexConsumption"), None, Some("pod-XYZ"), None); assert_eq!(id, Some("pod-xyz".to_string())); } #[test] - fn test_resolve_instance_id_falls_back_to_container_name() { - let id = resolve_instance_id_from(None, None, Some("container-123")); - assert_eq!(id, Some("container-123".to_string())); + fn test_consumption_uses_container_name() { + let id = resolve_instance_id_from( + Some("Dynamic"), + Some("ABCD1234-111122223333444455"), + None, + None, + ); + assert_eq!(id, Some("abcd1234-111122223333444455".to_string())); + } + + #[test] + fn test_elastic_premium_uses_computer_name() { + let id = + resolve_instance_id_from(Some("ElasticPremium"), None, None, Some("ep0fakewk0000A1")); + assert_eq!(id, Some("ep0fakewk0000a1".to_string())); + } + + #[test] + fn test_dedicated_uses_computer_name() { + let id = resolve_instance_id_from(Some("PremiumV3"), None, None, Some("p3fakewk0000B2")); + assert_eq!(id, Some("p3fakewk0000b2".to_string())); + } + + #[test] + fn test_empty_string_is_treated_as_missing() { + // EP2 .NET exposes WEBSITE_INSTANCE_ID as empty; we should skip empties. + let id = + resolve_instance_id_from(Some("ElasticPremium"), Some(""), Some(""), Some("worker-1")); + assert_eq!(id, Some("worker-1".to_string())); + } + + #[test] + fn test_unknown_sku_falls_back_to_search_order() { + let id = resolve_instance_id_from(Some("SomeNewSku"), Some("container-1"), None, None); + assert_eq!(id, Some("container-1".to_string())); + } + + #[test] + fn test_missing_sku_falls_back_to_search_order() { + let id = resolve_instance_id_from(None, Some("container-1"), None, Some("worker-1")); + assert_eq!(id, Some("container-1".to_string())); + } + + #[test] + fn test_no_env_vars_returns_none() { + let id = resolve_instance_id_from(None, None, None, None); + assert_eq!(id, None); } } From 41cc05e8b164e1788850da3efcf00efb44e3ff49 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Thu, 23 Apr 2026 17:32:16 -0400 Subject: [PATCH 12/20] Add unit test for Windows fallback --- Cargo.lock | 34 ++----------------- .../datadog-metrics-collector/src/instance.rs | 8 +++++ 2 files changed, 11 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b27d633..3c96bf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1495,40 +1495,12 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "libdd-common" -version = "3.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" -dependencies = [ - "anyhow", - "bytes", - "cc", - "const_format", - "futures", - "futures-core", - "futures-util", - "hex", - "http", - "http-body", - "http-body-util", - "hyper", - "hyper-util", - "libc", - "nix", - "pin-project", - "regex", - "serde", - "static_assertions", - "thiserror 1.0.69", - "tokio", - "tower-service", - "windows-sys 0.52.0", -] + [[package]] name = "libdd-common" -version = "4.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d7eef8031192d0ee79ba64cd824804c5a57abacf#d7eef8031192d0ee79ba64cd824804c5a57abacf" +version = "3.0.2" +source = "git+https://github.com/DataDog/libdatadog?rev=27aa92cfeeca073d8730a8b4974bd3fdef7ddf3a#27aa92cfeeca073d8730a8b4974bd3fdef7ddf3a" dependencies = [ "anyhow", "bytes", diff --git a/crates/datadog-metrics-collector/src/instance.rs b/crates/datadog-metrics-collector/src/instance.rs index 7581514..59789d8 100644 --- a/crates/datadog-metrics-collector/src/instance.rs +++ b/crates/datadog-metrics-collector/src/instance.rs @@ -166,4 +166,12 @@ mod tests { let id = resolve_instance_id_from(None, None, None, None); assert_eq!(id, None); } + + // On Windows Consumption we've observed CONTAINER_NAME and WEBSITE_POD_NAME + // unset but COMPUTERNAME set + #[test] + fn test_windows_consumption_falls_through_to_computer_name() { + let id = resolve_instance_id_from(Some("Dynamic"), None, None, Some("10-20-30-40")); + assert_eq!(id, Some("10-20-30-40".to_string())); + } } From 408c9413b78da7d25729dccb7d916d51a2767c93 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Fri, 24 Apr 2026 10:52:54 -0400 Subject: [PATCH 13/20] Use Tag::new() from libdd_common, make unknown a constant --- .../datadog-metrics-collector/src/instance.rs | 5 +-- crates/datadog-metrics-collector/src/tags.rs | 43 +++++++++++++------ 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/crates/datadog-metrics-collector/src/instance.rs b/crates/datadog-metrics-collector/src/instance.rs index 59789d8..60e6f3e 100644 --- a/crates/datadog-metrics-collector/src/instance.rs +++ b/crates/datadog-metrics-collector/src/instance.rs @@ -16,8 +16,8 @@ const INSTANCE_METRIC: &str = "azure.functions.enhanced.instance"; /// Resolves the instance ID from explicit values (used by tests). /// /// Picks the env var that matches the Azure integration metric's `instance` -/// tag for the current hosting plan (via `WEBSITE_SKU`), with fallback logic -/// if the preferred var is empty. +/// tag for the current hosting plan with fallback logic +/// if the preferred source is empty. fn resolve_instance_id_from( website_sku: Option<&str>, container_name: Option<&str>, @@ -143,7 +143,6 @@ mod tests { #[test] fn test_empty_string_is_treated_as_missing() { - // EP2 .NET exposes WEBSITE_INSTANCE_ID as empty; we should skip empties. let id = resolve_instance_id_from(Some("ElasticPremium"), Some(""), Some(""), Some("worker-1")); assert_eq!(id, Some("worker-1".to_string())); diff --git a/crates/datadog-metrics-collector/src/tags.rs b/crates/datadog-metrics-collector/src/tags.rs index c6db691..ab47211 100644 --- a/crates/datadog-metrics-collector/src/tags.rs +++ b/crates/datadog-metrics-collector/src/tags.rs @@ -6,8 +6,12 @@ //! Tags are attached to all enhanced metrics submitted by the metrics collector. use dogstatsd::metric::SortedTags; -use libdd_common::azure_app_services; +use libdd_common::{azure_app_services, tag::Tag}; use std::env; +use tracing::warn; + +/// `libdd_common::azure_app_services` returns this value when the corresponding Azure metadata isn't populated. +const AAS_UNKNOWN_VALUE: &str = "unknown"; /// Builds the common tags for all enhanced metrics. /// @@ -18,17 +22,27 @@ use std::env; /// The DogStatsD origin tag (e.g. `origin:azurefunction`) is added by the metrics aggregator, /// not here. pub fn build_enhanced_metrics_tags() -> Option { - let mut tag_parts = Vec::new(); + let mut tags = Vec::new(); + + fn push(tags: &mut Vec, key: &str, value: &str) { + if value.is_empty() { + return; + } + // Tag::new validates that the key and value are not empty and do not start or end with a colon + match Tag::new(key, value) { + Ok(t) => tags.push(t), + Err(e) => warn!("Skipping invalid tag {key}:{value}: {e}"), + } + } if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION { - let aas_tags = [ + for (name, value) in [ ("resource_group", aas_metadata.get_resource_group()), ("subscription_id", aas_metadata.get_subscription_id()), ("name", aas_metadata.get_site_name()), - ]; - for (name, value) in aas_tags { - if value != "unknown" { - tag_parts.push(format!("{}:{}", name, value)); + ] { + if value != AAS_UNKNOWN_VALUE { + push(&mut tags, name, value); } } } @@ -41,15 +55,18 @@ pub fn build_enhanced_metrics_tags() -> Option { ("version", "DD_VERSION"), ("serverless_compat_version", "DD_SERVERLESS_COMPAT_VERSION"), ] { - if let Ok(val) = env::var(env_var) - && !val.is_empty() - { - tag_parts.push(format!("{}:{}", tag_name, val)); + if let Ok(val) = env::var(env_var) { + push(&mut tags, tag_name, &val); } } - if tag_parts.is_empty() { + if tags.is_empty() { return None; } - SortedTags::parse(&tag_parts.join(",")).ok() + let joined = tags + .iter() + .map(|t| t.as_ref()) + .collect::>() + .join(","); + SortedTags::parse(&joined).ok() } From db0cad9b34595e1dd8baf2472bd9fc83b8ea2486 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Fri, 24 Apr 2026 11:19:04 -0400 Subject: [PATCH 14/20] Refactor build_enhanced_metrics_tags and add unit tests for build_tags --- crates/datadog-metrics-collector/src/tags.rs | 79 ++++++++++++++++---- 1 file changed, 65 insertions(+), 14 deletions(-) diff --git a/crates/datadog-metrics-collector/src/tags.rs b/crates/datadog-metrics-collector/src/tags.rs index ab47211..7558984 100644 --- a/crates/datadog-metrics-collector/src/tags.rs +++ b/crates/datadog-metrics-collector/src/tags.rs @@ -22,18 +22,7 @@ const AAS_UNKNOWN_VALUE: &str = "unknown"; /// The DogStatsD origin tag (e.g. `origin:azurefunction`) is added by the metrics aggregator, /// not here. pub fn build_enhanced_metrics_tags() -> Option { - let mut tags = Vec::new(); - - fn push(tags: &mut Vec, key: &str, value: &str) { - if value.is_empty() { - return; - } - // Tag::new validates that the key and value are not empty and do not start or end with a colon - match Tag::new(key, value) { - Ok(t) => tags.push(t), - Err(e) => warn!("Skipping invalid tag {key}:{value}: {e}"), - } - } + let mut pairs: Vec<(&'static str, String)> = Vec::new(); if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION { for (name, value) in [ @@ -42,7 +31,7 @@ pub fn build_enhanced_metrics_tags() -> Option { ("name", aas_metadata.get_site_name()), ] { if value != AAS_UNKNOWN_VALUE { - push(&mut tags, name, value); + pairs.push((name, value.to_string())); } } } @@ -56,10 +45,26 @@ pub fn build_enhanced_metrics_tags() -> Option { ("serverless_compat_version", "DD_SERVERLESS_COMPAT_VERSION"), ] { if let Ok(val) = env::var(env_var) { - push(&mut tags, tag_name, &val); + pairs.push((tag_name, val)); } } + build_tags(pairs) +} + +fn build_tags(pairs: impl IntoIterator) -> Option { + let mut tags: Vec = Vec::new(); + for (key, value) in pairs { + if value.is_empty() { + continue; + } + // Tag::new validates the combined "key:value" string: it must be + // non-empty and not start or end with a colon + match Tag::new(key, &value) { + Ok(t) => tags.push(t), + Err(e) => warn!("Skipping invalid tag {key}:{value}: {e}"), + } + } if tags.is_empty() { return None; } @@ -70,3 +75,49 @@ pub fn build_enhanced_metrics_tags() -> Option { .join(","); SortedTags::parse(&joined).ok() } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_tags_returns_none_when_no_pairs() { + let pairs: Vec<(&'static str, String)> = Vec::new(); + assert!(build_tags(pairs).is_none()); + } + + #[test] + fn test_build_tags_returns_none_when_all_values_empty() { + let pairs = vec![("service", String::new()), ("env", String::new())]; + assert!(build_tags(pairs).is_none()); + } + + #[test] + fn test_build_tags_skips_empty_values() { + let pairs = vec![("service", String::new()), ("env", "dev".to_string())]; + let tags = build_tags(pairs).unwrap().to_strings(); + assert_eq!(tags, vec!["env:dev"]); + } + + #[test] + fn test_build_tags_includes_all_nonempty_pairs() { + let pairs = vec![ + ("service", "svc-1".to_string()), + ("env", "dev".to_string()), + ("version", "1.2.3".to_string()), + ]; + let mut tags = build_tags(pairs).unwrap().to_strings(); + tags.sort(); + assert_eq!(tags, vec!["env:dev", "service:svc-1", "version:1.2.3"]); + } + + #[test] + fn test_build_tags_rejects_trailing_colon_values() { + let pairs = vec![ + ("service", "svc-1:".to_string()), + ("env", "dev".to_string()), + ]; + let tags = build_tags(pairs).unwrap().to_strings(); + assert_eq!(tags, vec!["env:dev"]); + } +} From c95bfe1a0b74906a6a201545ac7a4d187d76cfdc Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Wed, 29 Apr 2026 17:55:12 -0400 Subject: [PATCH 15/20] Rename Azure-specific files --- .../src/{instance.rs => azure_instance.rs} | 0 .../datadog-metrics-collector/src/{tags.rs => azure_tags.rs} | 0 crates/datadog-metrics-collector/src/lib.rs | 4 ++-- crates/datadog-serverless-compat/src/main.rs | 4 ++-- 4 files changed, 4 insertions(+), 4 deletions(-) rename crates/datadog-metrics-collector/src/{instance.rs => azure_instance.rs} (100%) rename crates/datadog-metrics-collector/src/{tags.rs => azure_tags.rs} (100%) diff --git a/crates/datadog-metrics-collector/src/instance.rs b/crates/datadog-metrics-collector/src/azure_instance.rs similarity index 100% rename from crates/datadog-metrics-collector/src/instance.rs rename to crates/datadog-metrics-collector/src/azure_instance.rs diff --git a/crates/datadog-metrics-collector/src/tags.rs b/crates/datadog-metrics-collector/src/azure_tags.rs similarity index 100% rename from crates/datadog-metrics-collector/src/tags.rs rename to crates/datadog-metrics-collector/src/azure_tags.rs diff --git a/crates/datadog-metrics-collector/src/lib.rs b/crates/datadog-metrics-collector/src/lib.rs index 5f22d37..173d62a 100644 --- a/crates/datadog-metrics-collector/src/lib.rs +++ b/crates/datadog-metrics-collector/src/lib.rs @@ -7,5 +7,5 @@ #![cfg_attr(not(test), deny(clippy::todo))] #![cfg_attr(not(test), deny(clippy::unimplemented))] -pub mod instance; -pub mod tags; +pub mod azure_instance; +pub mod azure_tags; diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 0d300aa..2e4edb5 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -41,7 +41,7 @@ use dogstatsd::{ util::parse_metric_namespace, }; -use datadog_metrics_collector::instance::InstanceMetricsCollector; +use datadog_metrics_collector::azure_instance::InstanceMetricsCollector; use dogstatsd::metric::{EMPTY_TAGS, SortedTags}; use tokio_util::sync::CancellationToken; @@ -260,7 +260,7 @@ pub async fn main() { let instance_collector = if instance_metric_enabled && metrics_flusher.is_some() { aggregator_handle.as_ref().and_then(|handle| { - let tags = datadog_metrics_collector::tags::build_enhanced_metrics_tags(); + let tags = datadog_metrics_collector::azure_tags::build_enhanced_metrics_tags(); InstanceMetricsCollector::new(handle.clone(), tags) }) } else { From dd5ac2bbb9eacd97f5f6fe488186213a78f0a89b Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Thu, 30 Apr 2026 11:16:13 -0400 Subject: [PATCH 16/20] Add unit tests for starting metrics components --- crates/datadog-serverless-compat/src/main.rs | 97 ++++++++++++++++++-- 1 file changed, 90 insertions(+), 7 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 2e4edb5..359d0e3 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -218,12 +218,11 @@ pub async fn main() { } }); - let needs_aggregator = dd_use_dogstatsd || instance_metric_enabled; + let enabled_metrics_components = + decide_metrics_components(dd_use_dogstatsd, instance_metric_enabled); - // The aggregator is shared between dogstatsd and enhanced metrics. - // It is started independently so that either can be enabled without the other. - // Only dogstatsd needs the dogstatsd listener - let (metrics_flusher, aggregator_handle) = if needs_aggregator { + // The metrics aggregator is started independently so that dogstatsd and enhanced metrics can be enabled independently. + let (metrics_flusher, aggregator_handle) = if enabled_metrics_components.start_aggregator { debug!("Creating metrics flusher and aggregator"); let (flusher, handle) = start_aggregator( @@ -234,7 +233,7 @@ pub async fn main() { ) .await; - if dd_use_dogstatsd { + if enabled_metrics_components.start_dogstatsd_listener { debug!("Starting dogstatsd"); let _ = start_dogstatsd_listener( dd_dogstatsd_port, @@ -258,7 +257,9 @@ pub async fn main() { (None, None) }; - let instance_collector = if instance_metric_enabled && metrics_flusher.is_some() { + let instance_collector = if enabled_metrics_components.start_instance_metrics_collector + && metrics_flusher.is_some() + { aggregator_handle.as_ref().and_then(|handle| { let tags = datadog_metrics_collector::azure_tags::build_enhanced_metrics_tags(); InstanceMetricsCollector::new(handle.clone(), tags) @@ -521,6 +522,33 @@ fn start_log_agent( Some((flusher, handle)) } +/// Records which metrics components are enabled and should be started. +#[derive(Debug, PartialEq)] +struct EnabledMetricsComponents { + start_aggregator: bool, + start_dogstatsd_listener: bool, + start_instance_metrics_collector: bool, +} + +/// Determines which components should be started based on configuration. +/// +/// The metrics aggregator is shared between dogstatsd and enhanced metrics, +/// so it's started if either is enabled. +fn decide_metrics_components( + dd_use_dogstatsd: bool, + instance_metric_enabled: bool, +) -> EnabledMetricsComponents { + let start_dogstatsd_listener = dd_use_dogstatsd; + let start_instance_metrics_collector = instance_metric_enabled; + let start_aggregator = start_dogstatsd_listener || start_instance_metrics_collector; + + EnabledMetricsComponents { + start_aggregator, + start_dogstatsd_listener, + start_instance_metrics_collector, + } +} + #[cfg(test)] mod log_agent_integration_tests { use datadog_logs_agent::{AggregatorService, IntakeEntry, LogServer, LogServerConfig}; @@ -627,3 +655,58 @@ mod log_agent_integration_tests { handle.shutdown().expect("shutdown"); } } + +#[cfg(test)] +mod metrics_components_tests { + use super::{EnabledMetricsComponents, decide_metrics_components}; + + #[test] + fn test_decide_metrics_components() { + let cases: &[(bool, bool, EnabledMetricsComponents)] = &[ + ( + false, + false, + EnabledMetricsComponents { + start_aggregator: false, + start_dogstatsd_listener: false, + start_instance_metrics_collector: false, + }, + ), + ( + true, + false, + EnabledMetricsComponents { + start_aggregator: true, + start_dogstatsd_listener: true, + start_instance_metrics_collector: false, + }, + ), + ( + false, + true, + EnabledMetricsComponents { + start_aggregator: true, + start_dogstatsd_listener: false, + start_instance_metrics_collector: true, + }, + ), + ( + true, + true, + EnabledMetricsComponents { + start_aggregator: true, + start_dogstatsd_listener: true, + start_instance_metrics_collector: true, + }, + ), + ]; + + for (dogstatsd, instance, expected) in cases { + let actual = decide_metrics_components(*dogstatsd, *instance); + assert_eq!( + &actual, expected, + "case (dd_use_dogstatsd={dogstatsd}, instance_metric_enabled={instance})" + ); + } + } +} From 850345b6b9ac4c63c2e63f87e1cd72060be5f5bb Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Thu, 30 Apr 2026 12:08:52 -0400 Subject: [PATCH 17/20] Couple metrics aggregator and flusher together --- crates/datadog-serverless-compat/src/main.rs | 101 ++++++++++--------- 1 file changed, 52 insertions(+), 49 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 359d0e3..0541859 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -221,52 +221,54 @@ pub async fn main() { let enabled_metrics_components = decide_metrics_components(dd_use_dogstatsd, instance_metric_enabled); - // The metrics aggregator is started independently so that dogstatsd and enhanced metrics can be enabled independently. - let (metrics_flusher, aggregator_handle) = if enabled_metrics_components.start_aggregator { - debug!("Creating metrics flusher and aggregator"); - - let (flusher, handle) = start_aggregator( - dd_api_key.clone(), - dd_site, - https_proxy.clone(), - dogstatsd_tags, - ) - .await; - - if enabled_metrics_components.start_dogstatsd_listener { - debug!("Starting dogstatsd"); - let _ = start_dogstatsd_listener( - dd_dogstatsd_port, - handle.clone(), - dd_statsd_metric_namespace, - #[cfg(all(windows, feature = "windows-pipes"))] - dd_dogstatsd_windows_pipe_name.clone(), + // The metrics aggregator and flusher are started together and shared between dogstatsd and enhanced metrics, + // so they are started if either is enabled. + let (metrics_flusher, aggregator_handle) = + if enabled_metrics_components.start_metrics_aggregator_and_flusher { + debug!("Creating metrics flusher and aggregator"); + + let (flusher, handle) = start_aggregator( + dd_api_key.clone(), + dd_site, + https_proxy.clone(), + dogstatsd_tags, ) .await; - if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { - info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + + if enabled_metrics_components.start_dogstatsd_listener { + debug!("Starting dogstatsd"); + let _ = start_dogstatsd_listener( + dd_dogstatsd_port, + handle.clone(), + dd_statsd_metric_namespace, + #[cfg(all(windows, feature = "windows-pipes"))] + dd_dogstatsd_windows_pipe_name.clone(), + ) + .await; + if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { + info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + } else { + info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + } } else { - info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + info!("dogstatsd disabled"); } + (flusher, Some(handle)) } else { - info!("dogstatsd disabled"); - } - (flusher, Some(handle)) - } else { - info!("dogstatsd and enhanced metrics disabled"); - (None, None) - }; + info!("dogstatsd and enhanced metrics disabled"); + (None, None) + }; - let instance_collector = if enabled_metrics_components.start_instance_metrics_collector - && metrics_flusher.is_some() - { - aggregator_handle.as_ref().and_then(|handle| { - let tags = datadog_metrics_collector::azure_tags::build_enhanced_metrics_tags(); - InstanceMetricsCollector::new(handle.clone(), tags) - }) - } else { - None - }; + let instance_collector: Option = + if enabled_metrics_components.start_instance_metrics_collector && metrics_flusher.is_some() + { + aggregator_handle.as_ref().and_then(|handle| { + let tags = datadog_metrics_collector::azure_tags::build_enhanced_metrics_tags(); + InstanceMetricsCollector::new(handle.clone(), tags) + }) + } else { + None + }; let (log_flusher, _log_aggregator_handle): (Option, Option) = if dd_logs_enabled { @@ -525,25 +527,26 @@ fn start_log_agent( /// Records which metrics components are enabled and should be started. #[derive(Debug, PartialEq)] struct EnabledMetricsComponents { - start_aggregator: bool, + start_metrics_aggregator_and_flusher: bool, start_dogstatsd_listener: bool, start_instance_metrics_collector: bool, } /// Determines which components should be started based on configuration. /// -/// The metrics aggregator is shared between dogstatsd and enhanced metrics, -/// so it's started if either is enabled. +/// The metrics aggregator and flusher are started together and shared between dogstatsd and enhanced metrics, +/// so they are started if either is enabled. fn decide_metrics_components( dd_use_dogstatsd: bool, instance_metric_enabled: bool, ) -> EnabledMetricsComponents { let start_dogstatsd_listener = dd_use_dogstatsd; let start_instance_metrics_collector = instance_metric_enabled; - let start_aggregator = start_dogstatsd_listener || start_instance_metrics_collector; + let start_metrics_aggregator_and_flusher = + start_dogstatsd_listener || start_instance_metrics_collector; EnabledMetricsComponents { - start_aggregator, + start_metrics_aggregator_and_flusher, start_dogstatsd_listener, start_instance_metrics_collector, } @@ -667,7 +670,7 @@ mod metrics_components_tests { false, false, EnabledMetricsComponents { - start_aggregator: false, + start_metrics_aggregator_and_flusher: false, start_dogstatsd_listener: false, start_instance_metrics_collector: false, }, @@ -676,7 +679,7 @@ mod metrics_components_tests { true, false, EnabledMetricsComponents { - start_aggregator: true, + start_metrics_aggregator_and_flusher: true, start_dogstatsd_listener: true, start_instance_metrics_collector: false, }, @@ -685,7 +688,7 @@ mod metrics_components_tests { false, true, EnabledMetricsComponents { - start_aggregator: true, + start_metrics_aggregator_and_flusher: true, start_dogstatsd_listener: false, start_instance_metrics_collector: true, }, @@ -694,7 +697,7 @@ mod metrics_components_tests { true, true, EnabledMetricsComponents { - start_aggregator: true, + start_metrics_aggregator_and_flusher: true, start_dogstatsd_listener: true, start_instance_metrics_collector: true, }, From 9762040c7964a87bde333af61ee860d0a249100b Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Thu, 30 Apr 2026 12:11:30 -0400 Subject: [PATCH 18/20] Add clarifying comment --- crates/datadog-serverless-compat/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 0541859..2401e09 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -259,6 +259,7 @@ pub async fn main() { (None, None) }; + // Skip enhanced metrics collection if we can't flush metrics let instance_collector: Option = if enabled_metrics_components.start_instance_metrics_collector && metrics_flusher.is_some() { From 3dbdd4c508f8bb574a508661a79276b1bb4f92f2 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Fri, 1 May 2026 15:20:16 -0400 Subject: [PATCH 19/20] nit: fix formatting and Cargo.lock --- Cargo.lock | 34 ++++++++++++++++++-- crates/datadog-serverless-compat/src/main.rs | 1 - 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c96bf2..b27d633 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1495,12 +1495,40 @@ dependencies = [ "windows-sys 0.52.0", ] - +[[package]] +name = "libdd-common" +version = "3.0.1" +source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" +dependencies = [ + "anyhow", + "bytes", + "cc", + "const_format", + "futures", + "futures-core", + "futures-util", + "hex", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "libc", + "nix", + "pin-project", + "regex", + "serde", + "static_assertions", + "thiserror 1.0.69", + "tokio", + "tower-service", + "windows-sys 0.52.0", +] [[package]] name = "libdd-common" -version = "3.0.2" -source = "git+https://github.com/DataDog/libdatadog?rev=27aa92cfeeca073d8730a8b4974bd3fdef7ddf3a#27aa92cfeeca073d8730a8b4974bd3fdef7ddf3a" +version = "4.0.0" +source = "git+https://github.com/DataDog/libdatadog?rev=d7eef8031192d0ee79ba64cd824804c5a57abacf#d7eef8031192d0ee79ba64cd824804c5a57abacf" dependencies = [ "anyhow", "bytes", diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 2401e09..0bd9065 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -130,7 +130,6 @@ pub async fn main() { let instance_metric_enabled = env_type == EnvironmentType::AzureFunction; - let dd_agent_stats_computation_enabled = env::var("DD_AGENT_STATS_COMPUTATION_ENABLED") .map(|val| val.to_lowercase() == "true") .unwrap_or(false); From 07f53cce31c3334ae3fd5894d5f65b41abc7b705 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Fri, 1 May 2026 15:56:10 -0400 Subject: [PATCH 20/20] Update datadog-metrics-collector libdatadog rev to d7eef8031192d0ee79ba64cd824804c5a57abacf --- Cargo.lock | 32 +-------------------- crates/datadog-metrics-collector/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b27d633..62816c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -492,7 +492,7 @@ name = "datadog-metrics-collector" version = "0.1.0" dependencies = [ "dogstatsd", - "libdd-common 3.0.1", + "libdd-common 4.0.0", "tracing", ] @@ -1495,36 +1495,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "libdd-common" -version = "3.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=8c88979985154d6d97c0fc2ca9039682981eacad#8c88979985154d6d97c0fc2ca9039682981eacad" -dependencies = [ - "anyhow", - "bytes", - "cc", - "const_format", - "futures", - "futures-core", - "futures-util", - "hex", - "http", - "http-body", - "http-body-util", - "hyper", - "hyper-util", - "libc", - "nix", - "pin-project", - "regex", - "serde", - "static_assertions", - "thiserror 1.0.69", - "tokio", - "tower-service", - "windows-sys 0.52.0", -] - [[package]] name = "libdd-common" version = "4.0.0" diff --git a/crates/datadog-metrics-collector/Cargo.toml b/crates/datadog-metrics-collector/Cargo.toml index 7e6d0b9..9bdce6f 100644 --- a/crates/datadog-metrics-collector/Cargo.toml +++ b/crates/datadog-metrics-collector/Cargo.toml @@ -8,4 +8,4 @@ description = "Collector to read, compute, and submit enhanced metrics in Server [dependencies] dogstatsd = { path = "../dogstatsd", default-features = true } tracing = { version = "0.1", default-features = false } -libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", default-features = false } +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "d7eef8031192d0ee79ba64cd824804c5a57abacf", default-features = false }