diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs index 6523c8569..e0433b1e0 100644 --- a/bottlecap/src/traces/stats_concentrator_service.rs +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -12,6 +12,66 @@ use tracing::error; const S_TO_NS: u64 = 1_000_000_000; const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds +/// Span kinds eligible for stats computation, matching the Go agent's default +/// `ComputeStatsBySpanKind: true` behavior. +/// Reference: `datadog-agent/pkg/trace/stats/span_concentrator.go` (`KindsComputed`) +/// +/// TODO: Move to datadog-agent-config in serverless-components so both bottlecap and +/// serverless-compat can share this list. +const STATS_ELIGIBLE_SPAN_KINDS: [&str; 4] = ["client", "consumer", "producer", "server"]; + +/// Default peer tag keys for stats aggregation, matching the Go agent's `basePeerTags` +/// derived from pkg/trace/semantics/mappings.json via the 16 peer tag concepts. +/// Reference: `datadog-agent/pkg/trace/config/peer_tags.go` (`peerTagConcepts` + `basePeerTags`) +/// +/// TODO: Move to datadog-agent-config in serverless-components so both bottlecap and +/// serverless-compat can share this list. +const DEFAULT_PEER_TAG_KEYS: [&str; 43] = [ + "_dd.base_service", + "active_record.db.vendor", + "amqp.destination", + "amqp.exchange", + "amqp.queue", + "aws.queue.name", + "aws.s3.bucket", + "bucketname", + "cassandra.keyspace", + "db.cassandra.contact.points", + "db.couchbase.seed.nodes", + "db.hostname", + "db.instance", + "db.name", + "db.namespace", + "db.system", + "db.type", + "dns.hostname", + "grpc.host", + "hostname", + "http.host", + "http.server_name", + "messaging.destination", + "messaging.destination.name", + "messaging.kafka.bootstrap.servers", + "messaging.rabbitmq.exchange", + "messaging.system", + "mongodb.db", + "msmq.queue.path", + "net.peer.name", + "network.destination.ip", + "network.destination.name", + "out.host", + "peer.hostname", + "peer.service", + "queuename", + "rpc.service", + "rpc.system", + "sequel.db.vendor", + "server.address", + "streamname", + "tablename", + "topicname", +]; + #[derive(Debug, thiserror::Error)] pub enum StatsError { #[error("Failed to send command to concentrator: {0}")] @@ -113,12 +173,17 @@ impl StatsConcentratorService { pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { let (tx, rx) = mpsc::unbounded_channel(); let handle = StatsConcentratorHandle::new(tx); - // TODO: set span_kinds_stats_computed and peer_tag_keys let concentrator = SpanConcentrator::new( Duration::from_nanos(BUCKET_DURATION_NS), SystemTime::now(), - vec![], - vec![], + STATS_ELIGIBLE_SPAN_KINDS + .iter() + .map(ToString::to_string) + .collect(), + DEFAULT_PEER_TAG_KEYS + .iter() + .map(ToString::to_string) + .collect(), ); let service: StatsConcentratorService = Self { concentrator, @@ -192,3 +257,121 @@ impl StatsConcentratorService { } } } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use std::collections::HashMap; + + /// Create a `pb::Span` with the given meta tags and metrics. + /// The span is non-root (`parent_id=1`) and not measured, so it will only be + /// eligible for stats if `span_kinds_stats_computed` includes its `span.kind`. + fn create_span_kind_span(span_kind: &str, meta: Vec<(&str, &str)>) -> pb::Span { + let now_ns = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + let mut meta_map: HashMap = meta + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + meta_map.insert("span.kind".to_string(), span_kind.to_string()); + pb::Span { + service: "test-service".to_string(), + name: "test-op".to_string(), + resource: "test-resource".to_string(), + trace_id: 1, + span_id: 2, + parent_id: 1, // non-root + start: now_ns, + duration: 100, + error: 0, + r#type: "web".to_string(), + meta: meta_map, + metrics: HashMap::new(), // no _top_level, no _dd.measured + meta_struct: HashMap::new(), + span_links: vec![], + span_events: vec![], + } + } + + /// A non-root, non-measured span with `span.kind`="client" should produce stats + /// when `ComputeStatsBySpanKind` is enabled (i.e. `span_kinds_stats_computed` is + /// populated). With the current empty vec, this span is silently dropped. + #[tokio::test] + async fn test_span_kind_stats_computed() { + let config = Arc::new(Config::default()); + let (service, handle) = StatsConcentratorService::new(config); + tokio::spawn(service.run()); + + let span = create_span_kind_span("client", vec![]); + handle.add(&span).unwrap(); + + let result = handle.flush(true).await.unwrap(); + + assert!( + result.is_some(), + "Expected stats for a client span, but got None. \ + span.kind-based eligibility is not working." + ); + let payload = result.unwrap(); + let all_stats: Vec<_> = payload.stats.iter().flat_map(|b| &b.stats).collect(); + assert!( + !all_stats.is_empty(), + "Expected at least one grouped stats entry for the client span." + ); + let client_stats: Vec<_> = all_stats + .iter() + .filter(|s| s.span_kind == "client") + .collect(); + assert!( + !client_stats.is_empty(), + "Expected a stats entry with span_kind='client'." + ); + } + + /// A client span with peer tag meta keys (`db.instance`, `db.system`) should produce + /// stats with non-empty `peer_tags` when `peer_tag_keys` is configured. With the + /// current empty vec, `peer_tags` in the output will always be empty. + #[tokio::test] + async fn test_peer_tags_populated() { + let config = Arc::new(Config::default()); + let (service, handle) = StatsConcentratorService::new(config); + tokio::spawn(service.run()); + + let span = create_span_kind_span( + "client", + vec![("db.instance", "i-1234"), ("db.system", "postgres")], + ); + handle.add(&span).unwrap(); + + let result = handle.flush(true).await.unwrap(); + + assert!( + result.is_some(), + "Expected stats for a client span with peer tags, but got None. \ + span.kind-based eligibility is not working." + ); + let payload = result.unwrap(); + let all_stats: Vec<_> = payload.stats.iter().flat_map(|b| &b.stats).collect(); + let stats_with_peer_tags: Vec<_> = all_stats + .iter() + .filter(|s| !s.peer_tags.is_empty()) + .collect(); + assert!( + !stats_with_peer_tags.is_empty(), + "Expected at least one stats entry with non-empty peer_tags, \ + but all entries have empty peer_tags." + ); + let peer_tags = &stats_with_peer_tags[0].peer_tags; + assert!( + peer_tags.iter().any(|t| t.starts_with("db.instance:")), + "Expected peer_tags to contain db.instance, got: {peer_tags:?}" + ); + assert!( + peer_tags.iter().any(|t| t.starts_with("db.system:")), + "Expected peer_tags to contain db.system, got: {peer_tags:?}" + ); + } +}