Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3d6fce1
Add InvocationProcessorHandle::noop() for test-mode binary
lucaspimentel Apr 23, 2026
62122d0
fix(bottlecap): replace unwrap() with expect() in noop() tests to sat…
lucaspimentel Apr 23, 2026
8a70de8
refactor(bottlecap): promote start_trace_agent to library crate
lucaspimentel Apr 23, 2026
f49e75a
refactor(bottlecap): add TraceAgent::with_flushing_service for test-m…
lucaspimentel Apr 23, 2026
3af7ae9
fix: return 500 from /flush handler on panic
lucaspimentel Apr 23, 2026
fb67a5d
refactor(startup): remove redundant re-export and allow attr
lucaspimentel Apr 23, 2026
7c02abf
refactor(startup): remove unnecessary clippy::type_complexity allow
lucaspimentel Apr 23, 2026
d15e2e5
refactor(traces): re-export TraceAgentPipeline from traces module
lucaspimentel Apr 23, 2026
49a9a80
refactor(noop): match real service channel capacity
lucaspimentel Apr 23, 2026
bab8963
docs(trace_agent): explain /flush panic isolation
lucaspimentel Apr 23, 2026
1be55e7
docs(startup): warn about leaked tasks if agent not spawned
lucaspimentel Apr 23, 2026
ecafa3a
refactor(startup): consolidate tokio::sync imports
lucaspimentel Apr 23, 2026
445b601
fix(trace_agent): add timeout and panic isolation to POST /flush
lucaspimentel Apr 23, 2026
6b8fbc8
test(noop): cover PlatformRuntimeDone and PlatformReport variants
lucaspimentel Apr 23, 2026
88c8e30
style: apply rustfmt
lucaspimentel Apr 23, 2026
02630c6
fix(trace_agent): abort timed-out flush task to bound execution
lucaspimentel Apr 23, 2026
9b6c1d2
refactor(startup): replace TraceAgentPipeline tuple with named struct
lucaspimentel Apr 24, 2026
c5f4a70
refactor(startup): move out of traces to top-level bottlecap::startup
lucaspimentel Apr 24, 2026
5a56b75
fix(trace_agent): return 504 directly after aborting flush task
lucaspimentel Apr 24, 2026
2ad997c
test(noop): call on_platform_runtime_done/on_platform_report via handle
lucaspimentel Apr 24, 2026
d9ac24d
docs(startup,trace_agent,noop): describe behavior, not the consumer
lucaspimentel Apr 24, 2026
45e70ea
refactor(trace_agent): replace flushing_service Option with RouterExt…
lucaspimentel Apr 24, 2026
a9f1c84
test(trace_agent): exercise RouterExtension seam through make_router
lucaspimentel Apr 24, 2026
05c8d55
docs(noop): fix inaccurate consequence described in exhaustive-match …
lucaspimentel Apr 24, 2026
6c9bcbc
docs(startup): count all four background tasks in build_trace_agent c…
lucaspimentel Apr 24, 2026
68477be
fix(trace_agent): RouterExtension::extend returns Result to surface e…
lucaspimentel Apr 24, 2026
3727b31
test(noop): guard all five request-response variants with an explicit…
lucaspimentel Apr 24, 2026
70f7906
style(trace_agent): apply rustfmt to RouterExtension::extend signatures
lucaspimentel Apr 24, 2026
240674e
docs(trace_agent,startup): clarify RouterExtension err propagation
lucaspimentel Apr 28, 2026
53b246c
feat(bottlecap): gate InvocationProcessorHandle::noop() behind test-m…
lucaspimentel Apr 28, 2026
149d9d4
docs(processor_service): document tokio runtime requirement on noop()
lucaspimentel Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ fips = [
"rustls/fips",
"rustls-native-certs",
]
# Exposes test-only constructors (for example,
# `InvocationProcessorHandle::noop()`) to the upcoming testmode binary.
# Not enabled in `default` or `fips`, so the items it gates do not appear
# in production builds.
test-mode = []

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)'] }
136 changes: 5 additions & 131 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,10 @@ use bottlecap::{
provider::Provider as TagProvider,
},
traces::{
http_client as trace_http_client,
propagation::DatadogCompositePropagator,
proxy_aggregator,
proxy_flusher::Flusher as ProxyFlusher,
span_dedup_service,
stats_aggregator::StatsAggregator,
stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService},
stats_flusher,
stats_concentrator_service::StatsConcentratorHandle,
stats_generator::StatsGenerator,
stats_processor, trace_agent,
trace_aggregator::SendDataBuilderInfo,
trace_aggregator_service::{
AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService,
},
trace_flusher,
trace_processor::{self, SendingTraceProcessor},
},
};
Expand All @@ -95,7 +84,6 @@ use dogstatsd::{
flusher::{Flusher as MetricsFlusher, FlusherConfig as MetricsFlusherConfig},
metric::{EMPTY_TAGS, SortedTags},
};
use libdd_trace_obfuscation::obfuscation_config;
use reqwest::Client;
use std::{collections::hash_map, env, path::Path, str::FromStr, sync::Arc};
use tokio::time::Instant;
Expand Down Expand Up @@ -356,16 +344,16 @@ async fn extension_loop_active(
}
};

let (
trace_agent_channel,
let bottlecap::startup::TraceAgentPipeline {
trace_tx: trace_agent_channel,
trace_flusher,
trace_processor,
stats_flusher,
proxy_flusher,
trace_agent_shutdown_token,
shutdown_token: trace_agent_shutdown_token,
stats_concentrator,
trace_aggregator_handle,
) = start_trace_agent(
} = bottlecap::startup::start_trace_agent(
config,
&api_key_factory,
&tags_provider,
Expand Down Expand Up @@ -1092,120 +1080,6 @@ fn start_logs_agent(
)
}

#[allow(clippy::type_complexity)]
fn start_trace_agent(
config: &Arc<Config>,
api_key_factory: &Arc<ApiKeyFactory>,
tags_provider: &Arc<TagProvider>,
invocation_processor_handle: InvocationProcessorHandle,
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
client: &Client,
) -> (
Sender<SendDataBuilderInfo>,
Arc<trace_flusher::TraceFlusher>,
Arc<trace_processor::ServerlessTraceProcessor>,
Arc<stats_flusher::StatsFlusher>,
Arc<ProxyFlusher>,
tokio_util::sync::CancellationToken,
StatsConcentratorHandle,
TraceAggregatorHandle,
) {
// Build one shared hyper-based HTTP client for trace and stats flushing.
// This client type is required by libdd_trace_utils for SendData::send().
let trace_http_client = trace_http_client::create_client(
config.proxy_https.as_ref(),
config.tls_cert_file.as_ref(),
config.skip_ssl_validation,
)
.expect("Failed to create trace HTTP client");

// Stats
let (stats_concentrator_service, stats_concentrator_handle) =
StatsConcentratorService::new(Arc::clone(config));
tokio::spawn(stats_concentrator_service.run());
let stats_aggregator: Arc<TokioMutex<StatsAggregator>> = Arc::new(TokioMutex::new(
StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()),
));
let stats_flusher = Arc::new(stats_flusher::StatsFlusher::new(
api_key_factory.clone(),
stats_aggregator.clone(),
Arc::clone(config),
trace_http_client.clone(),
));

let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

// Traces
let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default();
tokio::spawn(trace_aggregator_service.run());

let trace_flusher = Arc::new(trace_flusher::TraceFlusher::new(
trace_aggregator_handle.clone(),
config.clone(),
api_key_factory.clone(),
trace_http_client,
));

let obfuscation_config = obfuscation_config::ObfuscationConfig {
tag_replace_rules: config.apm_replace_tags.clone(),
http_remove_path_digits: config.apm_config_obfuscation_http_remove_paths_with_digits,
http_remove_query_string: config.apm_config_obfuscation_http_remove_query_string,
obfuscate_memcached: false,
obfuscation_redis_enabled: false,
obfuscation_redis_remove_all_args: false,
};

let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(obfuscation_config),
});

let (span_dedup_service, span_dedup_handle) = span_dedup_service::DedupService::new();
tokio::spawn(span_dedup_service.run());

// Proxy
let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default()));
let proxy_flusher = Arc::new(ProxyFlusher::new(
api_key_factory.clone(),
Arc::clone(&proxy_aggregator),
Arc::clone(tags_provider),
Arc::clone(config),
client.clone(),
));

let trace_agent = trace_agent::TraceAgent::new(
Arc::clone(config),
trace_aggregator_handle.clone(),
trace_processor.clone(),
stats_aggregator,
stats_processor,
proxy_aggregator,
invocation_processor_handle,
appsec_processor,
Arc::clone(tags_provider),
stats_concentrator_handle.clone(),
span_dedup_handle,
);
let trace_agent_channel = trace_agent.get_sender_copy();
let shutdown_token = trace_agent.shutdown_token();

tokio::spawn(async move {
if let Err(e) = trace_agent.start().await {
error!("Error starting trace agent: {e:?}");
}
});

(
trace_agent_channel,
trace_flusher,
trace_processor,
stats_flusher,
proxy_flusher,
shutdown_token,
stats_concentrator_handle,
trace_aggregator_handle,
)
}

async fn start_dogstatsd(
tags_provider: Arc<TagProvider>,
api_key_factory: Arc<ApiKeyFactory>,
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub mod otlp;
pub mod proc;
pub mod proxy;
pub mod secrets;
pub mod startup;
pub mod tags;
pub mod traces;

Expand Down
Loading
Loading