Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 186 additions & 3 deletions bottlecap/src/traces/stats_concentrator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,66 @@ use tracing::error;
const S_TO_NS: u64 = 1_000_000_000;
const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds

/// Span kinds eligible for stats computation, matching the Go agent's default
/// `ComputeStatsBySpanKind: true` behavior.
/// Reference: `datadog-agent/pkg/trace/stats/span_concentrator.go` (`KindsComputed`)
///
/// TODO: Move to datadog-agent-config in serverless-components so both bottlecap and
/// serverless-compat can share this list.
const STATS_ELIGIBLE_SPAN_KINDS: [&str; 4] = ["client", "consumer", "producer", "server"];

/// Default peer tag keys for stats aggregation, matching the Go agent's `basePeerTags`
/// derived from pkg/trace/semantics/mappings.json via the 16 peer tag concepts.
/// Reference: `datadog-agent/pkg/trace/config/peer_tags.go` (`peerTagConcepts` + `basePeerTags`)
///
/// TODO: Move to datadog-agent-config in serverless-components so both bottlecap and
/// serverless-compat can share this list.
const DEFAULT_PEER_TAG_KEYS: [&str; 43] = [
"_dd.base_service",
"active_record.db.vendor",
"amqp.destination",
"amqp.exchange",
"amqp.queue",
"aws.queue.name",
"aws.s3.bucket",
"bucketname",
"cassandra.keyspace",
"db.cassandra.contact.points",
"db.couchbase.seed.nodes",
"db.hostname",
"db.instance",
"db.name",
"db.namespace",
"db.system",
"db.type",
"dns.hostname",
"grpc.host",
"hostname",
"http.host",
"http.server_name",
"messaging.destination",
"messaging.destination.name",
"messaging.kafka.bootstrap.servers",
"messaging.rabbitmq.exchange",
"messaging.system",
"mongodb.db",
"msmq.queue.path",
"net.peer.name",
"network.destination.ip",
"network.destination.name",
"out.host",
"peer.hostname",
"peer.service",
"queuename",
"rpc.service",
"rpc.system",
"sequel.db.vendor",
"server.address",
"streamname",
"tablename",
"topicname",
];

#[derive(Debug, thiserror::Error)]
pub enum StatsError {
#[error("Failed to send command to concentrator: {0}")]
Expand Down Expand Up @@ -113,12 +173,17 @@ impl StatsConcentratorService {
pub fn new(config: Arc<Config>) -> (Self, StatsConcentratorHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let handle = StatsConcentratorHandle::new(tx);
// TODO: set span_kinds_stats_computed and peer_tag_keys
let concentrator = SpanConcentrator::new(
Duration::from_nanos(BUCKET_DURATION_NS),
SystemTime::now(),
vec![],
vec![],
STATS_ELIGIBLE_SPAN_KINDS
.iter()
.map(ToString::to_string)
.collect(),
DEFAULT_PEER_TAG_KEYS
.iter()
.map(ToString::to_string)
.collect(),
);
let service: StatsConcentratorService = Self {
concentrator,
Expand Down Expand Up @@ -192,3 +257,121 @@ impl StatsConcentratorService {
}
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use std::collections::HashMap;

/// Create a `pb::Span` with the given meta tags and metrics.
/// The span is non-root (`parent_id=1`) and not measured, so it will only be
/// eligible for stats if `span_kinds_stats_computed` includes its `span.kind`.
fn create_span_kind_span(span_kind: &str, meta: Vec<(&str, &str)>) -> pb::Span {
let now_ns = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64;
let mut meta_map: HashMap<String, String> = meta
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
meta_map.insert("span.kind".to_string(), span_kind.to_string());
pb::Span {
service: "test-service".to_string(),
name: "test-op".to_string(),
resource: "test-resource".to_string(),
trace_id: 1,
span_id: 2,
parent_id: 1, // non-root
start: now_ns,
duration: 100,
error: 0,
r#type: "web".to_string(),
meta: meta_map,
metrics: HashMap::new(), // no _top_level, no _dd.measured
meta_struct: HashMap::new(),
span_links: vec![],
span_events: vec![],
}
}

/// A non-root, non-measured span with `span.kind`="client" should produce stats
/// when `ComputeStatsBySpanKind` is enabled (i.e. `span_kinds_stats_computed` is
/// populated). With the current empty vec, this span is silently dropped.
#[tokio::test]
async fn test_span_kind_stats_computed() {
let config = Arc::new(Config::default());
let (service, handle) = StatsConcentratorService::new(config);
tokio::spawn(service.run());

let span = create_span_kind_span("client", vec![]);
handle.add(&span).unwrap();

let result = handle.flush(true).await.unwrap();

assert!(
result.is_some(),
"Expected stats for a client span, but got None. \
span.kind-based eligibility is not working."
);
let payload = result.unwrap();
let all_stats: Vec<_> = payload.stats.iter().flat_map(|b| &b.stats).collect();
assert!(
!all_stats.is_empty(),
"Expected at least one grouped stats entry for the client span."
);
let client_stats: Vec<_> = all_stats
.iter()
.filter(|s| s.span_kind == "client")
.collect();
assert!(
!client_stats.is_empty(),
"Expected a stats entry with span_kind='client'."
);
}

/// A client span with peer tag meta keys (`db.instance`, `db.system`) should produce
/// stats with non-empty `peer_tags` when `peer_tag_keys` is configured. With the
/// current empty vec, `peer_tags` in the output will always be empty.
#[tokio::test]
async fn test_peer_tags_populated() {
let config = Arc::new(Config::default());
let (service, handle) = StatsConcentratorService::new(config);
tokio::spawn(service.run());

let span = create_span_kind_span(
"client",
vec![("db.instance", "i-1234"), ("db.system", "postgres")],
);
handle.add(&span).unwrap();

let result = handle.flush(true).await.unwrap();

assert!(
result.is_some(),
"Expected stats for a client span with peer tags, but got None. \
span.kind-based eligibility is not working."
);
let payload = result.unwrap();
let all_stats: Vec<_> = payload.stats.iter().flat_map(|b| &b.stats).collect();
let stats_with_peer_tags: Vec<_> = all_stats
.iter()
.filter(|s| !s.peer_tags.is_empty())
.collect();
assert!(
!stats_with_peer_tags.is_empty(),
"Expected at least one stats entry with non-empty peer_tags, \
but all entries have empty peer_tags."
);
let peer_tags = &stats_with_peer_tags[0].peer_tags;
assert!(
peer_tags.iter().any(|t| t.starts_with("db.instance:")),
"Expected peer_tags to contain db.instance, got: {peer_tags:?}"
);
assert!(
peer_tags.iter().any(|t| t.starts_with("db.system:")),
"Expected peer_tags to contain db.system, got: {peer_tags:?}"
);
}
}
Loading