diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index e69734115..f9563ba7b 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -484,6 +484,7 @@ dependencies = [ "ddsketch-agent", "dogstatsd", "figment", + "flate2", "fnv", "futures", "hex", @@ -518,6 +519,7 @@ dependencies = [ "rand 0.8.6", "regex", "reqwest", + "rmp-serde", "rustls", "rustls-native-certs", "rustls-pemfile", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index b77902083..f0fd58388 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -92,6 +92,9 @@ tower = { version = "0.5", features = ["util"] } mock_instant = "0.6" serial_test = "3.1" tempfile = "3.20" +# fake-intake test harness: decode msgpack+gzip stats payloads on arrival +rmp-serde = { version = "1.3.1", default-features = false } +flate2 = { version = "1.1", default-features = false, features = ["rust_backend"] } [build-dependencies] # No external dependencies needed for the build script diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 8ee88bb26..ffcb89537 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1131,6 +1131,7 @@ fn start_trace_agent( stats_aggregator.clone(), Arc::clone(config), trace_http_client.clone(), + libdd_trace_utils::config_utils::trace_stats_url(&config.site), )); let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 1d4b4ae4d..14bdf1d80 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -14,13 +14,14 @@ use crate::traces::stats_aggregator::StatsAggregator; use dogstatsd::api_key::ApiKeyFactory; use libdd_common::Endpoint; use libdd_trace_protobuf::pb; -use libdd_trace_utils::{config_utils::trace_stats_url, stats_utils}; +use libdd_trace_utils::stats_utils; use tracing::{debug, error}; pub struct StatsFlusher { aggregator: Arc>, config: Arc, api_key_factory: Arc, + stats_url: String, endpoint: OnceCell, http_client: HttpClient, } @@ -32,11 +33,13 @@ impl StatsFlusher { aggregator: Arc>, config: Arc, http_client: HttpClient, + stats_url: String, ) -> Self { StatsFlusher { aggregator, config, api_key_factory, + stats_url, endpoint: OnceCell::new(), http_client, } @@ -64,9 +67,8 @@ impl StatsFlusher { .endpoint .get_or_init({ move || async move { - let stats_url = trace_stats_url(&self.config.site); Endpoint { - url: hyper::Uri::from_str(&stats_url) + url: hyper::Uri::from_str(&self.stats_url) .expect("can't make URI from stats url, exiting"), api_key: Some(api_key_clone.into()), timeout_ms: self.config.flush_timeout * S_TO_MS, @@ -92,8 +94,6 @@ impl StatsFlusher { } }; - let stats_url = trace_stats_url(&self.config.site); - for attempt in 1..=FLUSH_RETRY_COUNT { let start = std::time::Instant::now(); let resp = stats_utils::send_stats_payload_with_client( @@ -108,14 +108,16 @@ impl StatsFlusher { match resp { Ok(()) => { debug!( - "STATS | Successfully flushed stats to {stats_url} in {} ms (attempt {attempt}/{FLUSH_RETRY_COUNT})", + "STATS | Successfully flushed stats to {} in {} ms (attempt {attempt}/{FLUSH_RETRY_COUNT})", + endpoint.url, elapsed.as_millis() ); return None; } Err(e) => { debug!( - "STATS | Failed to send stats to {stats_url} in {} ms (attempt {attempt}/{FLUSH_RETRY_COUNT}): {e:?}", + "STATS | Failed to send stats to {} in {} ms (attempt {attempt}/{FLUSH_RETRY_COUNT}): {e:?}", + endpoint.url, elapsed.as_millis() ); } diff --git a/bottlecap/tests/apm_integration_test.rs b/bottlecap/tests/apm_integration_test.rs new file mode 100644 index 000000000..b154cd263 --- /dev/null +++ b/bottlecap/tests/apm_integration_test.rs @@ -0,0 +1,267 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Payload-level APM integration tests using the in-process fake-intake. +//! +//! Covers the two flush paths bottlecap uses to forward APM data to the +//! Datadog backend: +//! +//! - `StatsFlusher` → msgpack+gzip `pb::StatsPayload` on `/api/v0.2/stats` +//! - `TraceFlusher` → protobuf `pb::AgentPayload` on `/api/v0.2/traces` +//! +//! Each test spins up a `FakeIntake`, points the flusher at it, triggers a +//! flush, then decodes the captured payload and asserts on concrete fields. +//! This is what APMSVLS-496 phase 1 unblocks: regression coverage for +//! payload-level changes that `body_contains`-style mocks can't catch. + +use std::str::FromStr; +use std::sync::Arc; + +use bottlecap::config::Config; +use bottlecap::traces::http_client::create_client; +use bottlecap::traces::stats_aggregator::StatsAggregator; +use bottlecap::traces::stats_concentrator_service::StatsConcentratorService; +use bottlecap::traces::stats_flusher::StatsFlusher; +use bottlecap::traces::trace_aggregator::SendDataBuilderInfo; +use bottlecap::traces::trace_aggregator_service::AggregatorService; +use bottlecap::traces::trace_flusher::TraceFlusher; +use dogstatsd::api_key::ApiKeyFactory; +use libdd_common::Endpoint; +use libdd_trace_protobuf::pb; +use libdd_trace_utils::send_data::SendDataBuilder; +use libdd_trace_utils::trace_utils::TracerHeaderTags; +use libdd_trace_utils::tracer_payload::TracerPayloadCollection; +use tokio::sync::Mutex; + +#[path = "common/fake_intake.rs"] +mod fake_intake; + +use fake_intake::FakeIntake; + +const DD_API_KEY: &str = "my_test_key"; + +fn header_tags() -> TracerHeaderTags<'static> { + TracerHeaderTags { + lang: "rust", + lang_version: "1.80", + lang_interpreter: "rustc", + lang_vendor: "datadog", + tracer_version: "test", + container_id: "", + client_computed_top_level: true, + client_computed_stats: true, + dropped_p0_traces: 0, + dropped_p0_spans: 0, + } +} + +fn test_config() -> Arc { + Arc::new(Config { + api_key: DD_API_KEY.to_string(), + site: "datadoghq.com".to_string(), + ..Config::default() + }) +} + +fn endpoint_for(url: &str, api_key: &str) -> Endpoint { + Endpoint { + url: hyper::Uri::from_str(url).expect("test endpoint URL must parse"), + api_key: Some(api_key.to_string().into()), + timeout_ms: 5_000, + test_token: None, + use_system_resolver: false, + } +} + +#[tokio::test] +async fn stats_payload_roundtrip_through_fake_intake() { + let fake_intake = FakeIntake::start().await; + let config = test_config(); + let http_client = create_client(None, None, false).expect("failed to create http client"); + + // StatsFlusher::send() works directly on a Vec, + // bypassing the aggregator/concentrator. We still need to supply a + // StatsAggregator because the struct holds one; an idle concentrator + // is fine since send() never touches the aggregator. + let (concentrator_service, concentrator_handle) = + StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(concentrator_service.run()); + let aggregator = Arc::new(Mutex::new(StatsAggregator::new_with_concentrator( + concentrator_handle, + ))); + + let api_key_factory = Arc::new(ApiKeyFactory::new(DD_API_KEY)); + let flusher = StatsFlusher::new( + api_key_factory, + aggregator, + config, + http_client, + fake_intake.stats_url(), + ); + + let client_stats = pb::ClientStatsPayload { + hostname: "test-host".to_string(), + env: "test-env".to_string(), + version: "1.2.3".to_string(), + lang: "rust".to_string(), + tracer_version: "test-tracer".to_string(), + runtime_id: "00000000-0000-0000-0000-000000000001".to_string(), + sequence: 7, + service: "fake-intake-test-service".to_string(), + stats: vec![pb::ClientStatsBucket { + start: 1_700_000_000_000_000_000, + duration: 10_000_000_000, + agent_time_shift: 0, + stats: vec![pb::ClientGroupedStats { + service: "fake-intake-test-service".to_string(), + name: "handler".to_string(), + resource: "GET /fake".to_string(), + r#type: "web".to_string(), + http_status_code: 200, + db_type: String::new(), + hits: 3, + errors: 0, + duration: 42, + ok_summary: vec![0, 0, 0], + error_summary: vec![0, 0, 0], + synthetics: false, + top_level_hits: 3, + span_kind: "server".to_string(), + peer_tags: vec!["peer.service:upstream".to_string()], + is_trace_root: pb::Trilean::True.into(), + grpc_status_code: String::new(), + http_endpoint: "/fake".to_string(), + http_method: "GET".to_string(), + }], + }], + agent_aggregation: String::new(), + container_id: String::new(), + tags: vec![], + git_commit_sha: String::new(), + image_tag: String::new(), + process_tags_hash: 0, + process_tags: String::new(), + }; + + let failed = flusher.send(vec![client_stats]).await; + assert!( + failed.is_none(), + "stats send reported a retry-able failure: {failed:?}", + ); + + let captured = fake_intake.stats_payloads(); + assert_eq!(captured.len(), 1, "expected exactly one StatsPayload"); + + let payload = &captured[0]; + assert!( + payload.client_computed, + "bottlecap is the agent; client_computed must be true", + ); + assert_eq!(payload.stats.len(), 1); + let inner = &payload.stats[0]; + // libdd_trace_utils::stats_utils::construct_stats_payload zeroes hostname on every + // input before wrapping, so the sent value is "" regardless of what the caller set. + assert_eq!(inner.hostname, ""); + assert_eq!(inner.env, "test-env"); + assert_eq!(inner.version, "1.2.3"); + assert_eq!(inner.service, "fake-intake-test-service"); + assert_eq!(inner.sequence, 7); + assert_eq!(inner.stats.len(), 1); + let bucket = &inner.stats[0]; + assert_eq!(bucket.stats.len(), 1); + let grouped = &bucket.stats[0]; + assert_eq!(grouped.name, "handler"); + assert_eq!(grouped.resource, "GET /fake"); + assert_eq!(grouped.hits, 3); + assert_eq!(grouped.top_level_hits, 3); + assert_eq!(grouped.span_kind, "server"); + assert_eq!(grouped.peer_tags, vec!["peer.service:upstream".to_string()]); + assert_eq!(grouped.http_status_code, 200); + assert_eq!(grouped.http_method, "GET"); + assert_eq!(grouped.http_endpoint, "/fake"); + assert_eq!(grouped.is_trace_root, pb::Trilean::True as i32); +} + +#[tokio::test] +async fn trace_payload_roundtrip_through_fake_intake() { + let fake_intake = FakeIntake::start().await; + let config = test_config(); + let http_client = create_client(None, None, false).expect("failed to create http client"); + let endpoint = endpoint_for(&fake_intake.traces_url(), DD_API_KEY); + + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + tokio::spawn(aggregator_service.run()); + + let span = pb::Span { + service: "fake-intake-trace-service".to_string(), + name: "web.request".to_string(), + resource: "GET /fake".to_string(), + trace_id: 0x1111_1111_1111_1111, + span_id: 0x2222_2222_2222_2222, + parent_id: 0, + start: 1_700_000_000_000_000_000, + duration: 5_000_000, + error: 0, + r#type: "web".to_string(), + ..pb::Span::default() + }; + let chunk = pb::TraceChunk { + priority: 1, + origin: String::new(), + spans: vec![span], + tags: Default::default(), + dropped_trace: false, + }; + let tracer_payload = pb::TracerPayload { + container_id: String::new(), + language_name: "rust".to_string(), + language_version: "1.80".to_string(), + tracer_version: "test".to_string(), + runtime_id: "00000000-0000-0000-0000-000000000002".to_string(), + chunks: vec![chunk], + tags: Default::default(), + env: "test-env".to_string(), + hostname: String::new(), + app_version: "1.2.3".to_string(), + }; + + let tags = header_tags(); + let builder = SendDataBuilder::new( + 1, + TracerPayloadCollection::V07(vec![tracer_payload]), + tags.clone(), + &endpoint, + ); + aggregator_handle + .insert_payload(SendDataBuilderInfo::new(builder, 1, tags.into())) + .expect("insert_payload must succeed"); + + let api_key_factory = Arc::new(ApiKeyFactory::new(DD_API_KEY)); + let flusher = TraceFlusher::new(aggregator_handle, config, api_key_factory, http_client); + + let failed = flusher.flush(None).await; + assert!( + failed.is_none(), + "trace flush reported a retry-able failure: {failed:?}", + ); + + let captured = fake_intake.trace_payloads(); + assert_eq!(captured.len(), 1, "expected exactly one AgentPayload"); + + let payload = &captured[0]; + assert_eq!(payload.tracer_payloads.len(), 1); + let tp = &payload.tracer_payloads[0]; + assert_eq!(tp.language_name, "rust"); + assert_eq!(tp.env, "test-env"); + assert_eq!(tp.app_version, "1.2.3"); + assert_eq!(tp.chunks.len(), 1); + let chunk = &tp.chunks[0]; + assert_eq!(chunk.priority, 1); + assert_eq!(chunk.spans.len(), 1); + let span = &chunk.spans[0]; + assert_eq!(span.service, "fake-intake-trace-service"); + assert_eq!(span.name, "web.request"); + assert_eq!(span.resource, "GET /fake"); + assert_eq!(span.trace_id, 0x1111_1111_1111_1111); + assert_eq!(span.span_id, 0x2222_2222_2222_2222); +} diff --git a/bottlecap/tests/common/fake_intake.rs b/bottlecap/tests/common/fake_intake.rs new file mode 100644 index 000000000..8f027676a --- /dev/null +++ b/bottlecap/tests/common/fake_intake.rs @@ -0,0 +1,228 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! In-process fake Datadog intake for APM payload-level integration tests. +//! +//! Spawns an axum server on a random local port that accepts the same APM +//! endpoints bottlecap flushes to, decodes msgpack / protobuf payloads on +//! arrival, and stores the decoded structs. Tests then call typed query +//! methods to assert on payload contents. +//! +//! Endpoints supported: +//! +//! - `POST /api/v0.2/stats`: msgpack, gzip-compressed, `pb::StatsPayload` +//! - `POST /api/v0.2/traces`: protobuf (optionally zstd-compressed), `pb::AgentPayload` +//! +//! Prototype for APMSVLS-494 phase 1. If the API proves out, this file gets +//! extracted into the shared `datadog/apm-agent-parity-rs` repo in phase 2. + +use std::io::Read; +use std::sync::{Arc, Mutex}; + +use axum::{ + Router, + body::Bytes, + extract::State, + http::{HeaderMap, StatusCode}, + routing::post, +}; +use libdd_trace_protobuf::pb; +use prost::Message; +use tokio::net::TcpListener; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; + +/// Captured, decoded APM payloads for a single test run. +#[derive(Default)] +struct Captured { + stats: Vec, + traces: Vec, +} + +/// Shared server state. The axum handlers write to the mutexes; tests read +/// via `FakeIntake::stats_payloads()` / `trace_payloads()`. +struct SharedState { + captured: Mutex, +} + +/// A running fake-intake server. Drop shuts it down. +pub struct FakeIntake { + base_url: String, + state: Arc, + shutdown_tx: Option>, + task: Option>, +} + +impl FakeIntake { + /// Bind to `127.0.0.1` on an OS-assigned port and start serving. + pub async fn start() -> Self { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("fake_intake: failed to bind listener"); + let addr = listener + .local_addr() + .expect("fake_intake: failed to read bound address"); + let base_url = format!("http://{addr}"); + + let state = Arc::new(SharedState { + captured: Mutex::new(Captured::default()), + }); + + let router = Router::new() + .route("/api/v0.2/stats", post(handle_stats)) + .route("/api/v0.2/traces", post(handle_traces)) + .with_state(Arc::clone(&state)); + + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let task = tokio::spawn(async move { + axum::serve(listener, router) + .with_graceful_shutdown(async move { + let _ = shutdown_rx.await; + }) + .await + .expect("fake_intake: axum server error"); + }); + + Self { + base_url, + state, + shutdown_tx: Some(shutdown_tx), + task: Some(task), + } + } + + /// Full URL for the stats endpoint. + #[must_use] + pub fn stats_url(&self) -> String { + format!("{}/api/v0.2/stats", self.base_url) + } + + /// Full URL for the traces endpoint. + #[must_use] + pub fn traces_url(&self) -> String { + format!("{}/api/v0.2/traces", self.base_url) + } + + /// All `StatsPayload`s captured so far, in arrival order. + #[must_use] + pub fn stats_payloads(&self) -> Vec { + self.state + .captured + .lock() + .expect("fake_intake: stats mutex poisoned") + .stats + .clone() + } + + /// All `AgentPayload`s captured so far, in arrival order. + #[must_use] + pub fn trace_payloads(&self) -> Vec { + self.state + .captured + .lock() + .expect("fake_intake: traces mutex poisoned") + .traces + .clone() + } +} + +impl Drop for FakeIntake { + fn drop(&mut self) { + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + if let Some(task) = self.task.take() { + task.abort(); + } + } +} + +async fn handle_stats( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> StatusCode { + let decoded = match decompress(&headers, &body) { + Ok(d) => d, + Err(e) => { + eprintln!("{e}"); + return StatusCode::BAD_REQUEST; + } + }; + match rmp_serde::from_slice::(&decoded) { + Ok(payload) => { + state + .captured + .lock() + .expect("fake_intake: stats mutex poisoned") + .stats + .push(payload); + StatusCode::ACCEPTED + } + Err(err) => { + eprintln!("fake_intake: failed to decode StatsPayload msgpack: {err}"); + StatusCode::BAD_REQUEST + } + } +} + +async fn handle_traces( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> StatusCode { + let decoded = match decompress(&headers, &body) { + Ok(d) => d, + Err(e) => { + eprintln!("{e}"); + return StatusCode::BAD_REQUEST; + } + }; + match pb::AgentPayload::decode(decoded.as_slice()) { + Ok(payload) => { + state + .captured + .lock() + .expect("fake_intake: traces mutex poisoned") + .traces + .push(payload); + StatusCode::ACCEPTED + } + Err(err) => { + eprintln!("fake_intake: failed to decode AgentPayload protobuf: {err}"); + StatusCode::BAD_REQUEST + } + } +} + +/// Decompress a request body based on its `Content-Encoding` header. +/// Supports `gzip` and `zstd`. An unknown or absent encoding is treated as +/// identity: the body is returned unchanged. +fn decompress(headers: &HeaderMap, body: &Bytes) -> Result, String> { + let encoding = headers + .get("content-encoding") + .and_then(|v| v.to_str().ok()) + .unwrap_or("") + .to_ascii_lowercase(); + + match encoding.as_str() { + "gzip" => { + let mut decoder = flate2::read::GzDecoder::new(body.as_ref()); + let mut out = Vec::new(); + decoder + .read_to_end(&mut out) + .map_err(|e| format!("fake_intake: gzip decode failed: {e}"))?; + Ok(out) + } + "zstd" => zstd::stream::decode_all(body.as_ref()) + .map_err(|e| format!("fake_intake: zstd decode failed: {e}")), + _ => { + if !encoding.is_empty() { + eprintln!( + "fake_intake: unrecognized Content-Encoding '{encoding}', treating as identity" + ); + } + Ok(body.to_vec()) + } + } +} diff --git a/bottlecap/tests/common/mod.rs b/bottlecap/tests/common/mod.rs index 8b1378917..521e0d805 100644 --- a/bottlecap/tests/common/mod.rs +++ b/bottlecap/tests/common/mod.rs @@ -1 +1,2 @@ - +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 8f619fdb1..551942a43 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -9,8 +9,6 @@ use httpmock::prelude::*; use std::collections::HashMap; use std::sync::Arc; -mod common; - #[tokio::test] async fn test_logs() { let arch = match std::env::consts::ARCH { diff --git a/bottlecap/tests/metrics_integration_test.rs b/bottlecap/tests/metrics_integration_test.rs index a57fd3fb3..5456a5d73 100644 --- a/bottlecap/tests/metrics_integration_test.rs +++ b/bottlecap/tests/metrics_integration_test.rs @@ -9,8 +9,6 @@ use dogstatsd::metric::SortedTags; use httpmock::prelude::*; use std::sync::Arc; -mod common; - #[tokio::test] async fn test_enhanced_metrics() { let dd_api_key = "my_test_key";