Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ darling = "0.23.0"
erased-serde = "0.4.10"
futures-util = "0.3.32"
governor = "0.10.4"
hex = "0.4.3"
http = "1.4.0"
http-body-util = "0.1.3"
hyper = { version = "1.9.0", default-features = false }
Expand All @@ -70,6 +71,7 @@ proc-macro2 = { version = "1.0.106", default-features = false }
prometheus = { version = "0.14.0", default-features = false }
prometheus-client = "0.18.1"
prometools = "0.2.3"
prost = "0.14"
rand = "0.10.1"
percent-encoding = "2.3.2"
quote = "1.0.45"
Expand Down Expand Up @@ -120,3 +122,7 @@ tower = "0.5.3"
# Only used in deprecated `foundations` code
# TODO: remove before next major release
sentry-core = { version = "0.36.0", default-features = false }

# TEMPORARY
[patch.crates-io]
cf-rustracing = { git = "https://github.com/mar-cf/rustracing.git", branch = "user-tracing" }
Comment on lines +126 to +128

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

84 changes: 83 additions & 1 deletion foundations-macros/src/span_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ struct Options {

#[darling(default = "Options::default_generic")]
generic: bool,

#[darling(default = "Options::default_user")]
user: bool,
}

impl Options {
Expand All @@ -61,6 +64,10 @@ impl Options {
fn default_generic() -> bool {
cfg!(foundations_generic_telemetry_wrapper)
}

fn default_user() -> bool {
false
}
}

struct Args {
Expand Down Expand Up @@ -119,9 +126,10 @@ fn expand_from_parsed(args: Args, item_fn: ItemFn) -> TokenStream2 {
None => try_async_trait_fn_rewrite(&args, &block).unwrap_or_else(|| {
let span_name = args.span_name.as_tokens();
let crate_path = &args.options.crate_path;
let user_span = with_user_span_call(&args.options);

quote!(
let __span = #crate_path::telemetry::tracing::span(#span_name);
let __span = #crate_path::telemetry::tracing::span(#span_name)#user_span;
#block
)
}),
Expand Down Expand Up @@ -191,15 +199,26 @@ fn wrap_with_span(args: &Args, block: TokenStream2) -> TokenStream2 {

let span_name = args.span_name.as_tokens();
let crate_path = &args.options.crate_path;
let user_span = with_user_span_call(&args.options);

quote!(
#crate_path::telemetry::tracing::span(#span_name)
#user_span
.into_context()
.#apply_fn(#block)
.await
)
}

/// Emits `.with_user_span()` when `user = true`, otherwise nothing.
fn with_user_span_call(options: &Options) -> TokenStream2 {
if options.user {
quote!(.with_user_span())
} else {
quote!()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -236,6 +255,36 @@ mod tests {
assert_eq!(actual, expected);
}

#[test]
fn expand_sync_fn_user() {
let args = parse_attr! {
#[span_fn("sync_span", user = true)]
};

let item_fn = parse_quote! {
fn do_sync() -> io::Result<String> {
do_something_else();

Ok("foo".into())
}
};

let actual = expand_from_parsed(args, item_fn).to_string();

let expected = code_str! {
fn do_sync<>() -> io::Result<String> {
let __span = ::foundations::telemetry::tracing::span("sync_span").with_user_span();
{
do_something_else();

Ok("foo".into())
}
}
};

assert_eq!(actual, expected);
}

#[test]
fn expand_sync_fn_const_span_name() {
let args = parse_attr! {
Expand Down Expand Up @@ -298,6 +347,39 @@ mod tests {
assert_eq!(actual, expected);
}

#[test]
fn expand_async_fn_user() {
let args = parse_attr! {
#[span_fn("async_span", user = true)]
};

let item_fn = parse_quote! {
async fn do_async() -> io::Result<String> {
do_something_else().await;

Ok("foo".into())
}
};

let actual = expand_from_parsed(args, item_fn).to_string();

let expected = code_str! {
async fn do_async<>() -> io::Result<String> {
::foundations::telemetry::tracing::span("async_span")
.with_user_span()
.into_context()
.apply(async move {{
do_something_else().await;

Ok("foo".into())
}})
.await
}
};

assert_eq!(actual, expected);
}

#[test]
fn expand_async_fn_local() {
let args = parse_attr! {
Expand Down
15 changes: 15 additions & 0 deletions foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ tracing = [
"dep:crossbeam-utils",
]

# Enables distributed user tracing functionality.
user-tracing = [
"tracing",
"dep:hex",
"dep:hyper",
"hyper/client",
"dep:hyper-util",
"dep:http-body-util",
"dep:serde_json",
"dep:prost",
"tokio/net",
]

# Enables metrics functionality.
metrics = [
"dep:foundations-macros",
Expand Down Expand Up @@ -223,6 +236,7 @@ crossbeam-utils = { workspace = true, optional = true }
erased-serde = { workspace = true, optional = true }
futures-util = { workspace = true, optional = true }
governor = { workspace = true, optional = true }
hex = { workspace = true, optional = true }
http = { workspace = true, optional = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, optional = true, features = ["http1", "server"] }
Expand All @@ -237,6 +251,7 @@ percent-encoding = { workspace = true, optional = true }
prometheus = { workspace = true, optional = true, features = ["process"] }
prometheus-client = { workspace = true, optional = true }
prometools = { workspace = true, optional = true, features = ["serde"] }
prost = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
serde = { workspace = true, optional = true, features = ["derive", "rc"] }
serde_json = { workspace = true, optional = true }
Expand Down
27 changes: 26 additions & 1 deletion foundations/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ mod scope;

mod telemetry_context;

#[cfg(all(feature = "tracing", feature = "telemetry-otlp-grpc"))]
#[cfg(all(
feature = "tracing",
any(feature = "telemetry-otlp-grpc", feature = "user-tracing")
))]
mod otlp_conversion;

#[cfg(feature = "testing")]
Expand Down Expand Up @@ -139,6 +142,12 @@ feature_use!(cfg(feature = "tracing"), {
});
});

#[cfg(feature = "user-tracing")]
use self::tracing::UserSpanScope;

#[cfg(all(feature = "user-tracing", feature = "testing"))]
use self::tracing::testing::UserTestTracerScope;

#[cfg(feature = "logging")]
use self::log::internal::LogScope;

Expand Down Expand Up @@ -260,11 +269,17 @@ pub struct TelemetryScope {
#[cfg(feature = "tracing")]
_span_scope: Option<SpanScope>,

#[cfg(feature = "user-tracing")]
_user_span_scope: Option<UserSpanScope>,

// NOTE: certain tracing APIs start a new trace, so we need to scope the test tracer
// for them to use the tracer from the test scope instead of production tracer in
// the harness.
#[cfg(all(feature = "tracing", feature = "testing"))]
_test_tracer_scope: Option<TestTracerScope>,

#[cfg(all(feature = "user-tracing", feature = "testing"))]
_user_test_tracer_scope: Option<UserTestTracerScope>,
}

/// Telemetry configuration that is passed to [`init`].
Expand Down Expand Up @@ -347,6 +362,16 @@ pub fn init(config: TelemetryConfig) -> BootstrapResult<TelemetryDriver> {
}
}

#[cfg(feature = "user-tracing")]
{
if let Some(user_settings) = &config.settings.user_tracing {
let initializer = self::tracing::init::init_user(config.service_info, user_settings)?;
if let Some(fut) = initializer {
tele_futures.push(fut);
}
}
}

TELEMETRY_INITIALIZED.store(true, Ordering::Relaxed);

#[cfg(feature = "telemetry-server")]
Expand Down
10 changes: 10 additions & 0 deletions foundations/src/telemetry/settings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ mod otlp_grpc_output;
#[cfg(feature = "tracing")]
mod tracing;

#[cfg(feature = "user-tracing")]
mod user_tracing;

#[cfg(feature = "logging")]
mod logging;

Expand All @@ -27,6 +30,9 @@ pub use self::otlp_grpc_output::*;
#[cfg(feature = "tracing")]
pub use self::tracing::*;

#[cfg(feature = "user-tracing")]
pub use self::user_tracing::*;

#[cfg(feature = "logging")]
pub use self::logging::*;

Expand All @@ -53,6 +59,10 @@ pub struct TelemetrySettings {
#[cfg(feature = "tracing")]
pub tracing: TracingSettings,

/// Distributed user tracing settings
#[cfg(feature = "user-tracing")]
pub user_tracing: Option<UserTracingSettings>,

/// Logging settings.
#[cfg(feature = "logging")]
pub logging: LoggingSettings,
Expand Down
110 changes: 110 additions & 0 deletions foundations/src/telemetry/settings/user_tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::num::NonZeroUsize;

#[cfg(feature = "settings")]
use crate::settings::settings;

/// Distributed user tracing settings.
#[cfg_attr(feature = "settings", settings(crate_path = "crate"))]
#[cfg_attr(not(feature = "settings"), derive(Clone, Debug, serde::Deserialize))]
pub struct UserTracingSettings {
/// Enables user tracing.
#[serde(default = "UserTracingSettings::default_enabled")]
pub enabled: bool,

/// Maximum number of spans to buffer for output. Any spans above
/// this limit will be dropped until the queue regains capacity.
///
/// The default is to buffer up to 1 million spans in memory. This protects
/// services from out-of-memory errors when the output gets heavily backed up.
/// To disable the limit entirely, set this setting to `None`.
#[serde(default = "UserTracingSettings::default_max_queue_size")]
pub max_queue_size: Option<NonZeroUsize>,

/// The output for the collected user traces.
pub output: UserTracesOutput,
}

#[cfg(not(feature = "settings"))]
impl Default for UserTracingSettings {
fn default() -> Self {
Self {
enabled: UserTracingSettings::default_enabled(),
max_queue_size: UserTracingSettings::default_max_queue_size(),
output: Default::default(),
}
}
}

impl UserTracingSettings {
fn default_enabled() -> bool {
true
}

const fn default_max_queue_size() -> Option<NonZeroUsize> {
Some(const { NonZeroUsize::new(1_000_000).expect("1_000_000 is not zero") })
}
}

/// The output for the collected user traces.
#[cfg_attr(
feature = "settings",
settings(crate_path = "crate", impl_default = false)
)]
#[cfg_attr(not(feature = "settings"), derive(Clone, Debug, serde::Deserialize))]
pub enum UserTracesOutput {
/// Send user tracing spans as OTLP over a Unix domain socket to an OTLP endpoint.
OtlpUds(OtlpUdsOutputSettings),
}

impl Default for UserTracesOutput {
fn default() -> Self {
Self::OtlpUds(Default::default())
}
}

/// [OTLP over UDS] output settings for user tracing.
///
/// Sends trace data as protobuf-encoded OTLP over HTTP to a Unix domain socket.
#[cfg_attr(feature = "settings", settings(crate_path = "crate"))]
#[cfg_attr(not(feature = "settings"), derive(Clone, Debug, serde::Deserialize))]
pub struct OtlpUdsOutputSettings {
/// Path to the Unix domain socket for the OTLP endpoint.
pub socket_path: String,

/// Number of concurrent worker tasks for user trace export.
///
/// # Default
///
/// Default value is `2`.
#[serde(default = "OtlpUdsOutputSettings::default_num_tasks")]
pub num_tasks: usize,

/// Maximum number of spans to drain per batch.
///
/// # Default
///
/// Default value is `512`.
#[serde(default = "OtlpUdsOutputSettings::default_max_batch_size")]
pub max_batch_size: usize,
}

#[cfg(not(feature = "settings"))]
impl Default for OtlpUdsOutputSettings {
fn default() -> Self {
Self {
socket_path: String::new(),
num_tasks: OtlpUdsOutputSettings::default_num_tasks(),
max_batch_size: OtlpUdsOutputSettings::default_max_batch_size(),
}
}
}

impl OtlpUdsOutputSettings {
const fn default_num_tasks() -> usize {
2
}

const fn default_max_batch_size() -> usize {
512
}
}
Loading
Loading