diff --git a/ic-bn-lib/src/vector/client.rs b/ic-bn-lib/src/vector/client.rs index 1a70515..a58746f 100644 --- a/ic-bn-lib/src/vector/client.rs +++ b/ic-bn-lib/src/vector/client.rs @@ -13,9 +13,8 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use http::header::{AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE}; use ic_bn_lib_common::{traits::http::Client as HttpClient, types::vector::VectorCli}; use prometheus::{ - Histogram, IntCounter, IntCounterVec, IntGauge, Registry, register_histogram_with_registry, - register_int_counter_vec_with_registry, register_int_counter_with_registry, - register_int_gauge_with_registry, + HistogramVec, IntCounterVec, IntGaugeVec, Registry, register_histogram_vec_with_registry, + register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, }; use reqwest::{Method, Request, header::HeaderValue}; use serde_json::Value; @@ -34,85 +33,94 @@ pub const MB: f64 = 1024.0 * KB; const CONTENT_ENCODING_ZSTD: HeaderValue = hval!("zstd"); #[derive(Clone)] -struct Metrics { - sent: IntCounter, - sent_compressed: IntCounter, - sent_events: IntCounter, - buffer_event_size: IntGauge, - batch_size: IntGauge, - buffer_drops: IntCounter, - encoding_failures: IntCounter, - batch_buffer_size: IntGauge, - batch_flush_retries: IntCounter, +pub struct Metrics { + sent: IntCounterVec, + sent_compressed: IntCounterVec, + sent_events: IntCounterVec, + buffer_event_size: IntGaugeVec, + batch_size: IntGaugeVec, + buffer_drops: IntCounterVec, + encoding_failures: IntCounterVec, + batch_buffer_size: IntGaugeVec, + batch_flush_retries: IntCounterVec, batch_flushes: IntCounterVec, - batch_queue_duration: Histogram, - batch_encode_duration: Histogram, - batch_flush_duration: Histogram, - batch_sizes: Histogram, + batch_queue_duration: HistogramVec, + batch_encode_duration: HistogramVec, + batch_flush_duration: HistogramVec, + batch_sizes: HistogramVec, } impl Metrics { pub fn new(registry: &Registry) -> Self { Self { - sent: register_int_counter_with_registry!( + sent: register_int_counter_vec_with_registry!( format!("vector_sent"), format!("Number of bytes sent"), + &["namespace"], registry ) .unwrap(), - sent_compressed: register_int_counter_with_registry!( + sent_compressed: register_int_counter_vec_with_registry!( format!("vector_sent_compressed"), format!("Number of bytes sent (compressed)"), + &["namespace"], registry ) .unwrap(), - sent_events: register_int_counter_with_registry!( + sent_events: register_int_counter_vec_with_registry!( format!("vector_sent_events"), format!("Number of events sent"), + &["namespace"], registry ) .unwrap(), - buffer_event_size: register_int_gauge_with_registry!( + buffer_event_size: register_int_gauge_vec_with_registry!( format!("vector_event_buffer_size"), format!("Number of events in the incoming buffer"), + &["namespace"], registry ) .unwrap(), - batch_size: register_int_gauge_with_registry!( + batch_size: register_int_gauge_vec_with_registry!( format!("vector_batch_size"), format!("Current size of the events queued for the next batch"), + &["namespace"], registry ) .unwrap(), - buffer_drops: register_int_counter_with_registry!( + buffer_drops: register_int_counter_vec_with_registry!( format!("vector_buffer_drops"), format!("Number of events that were dropped due to buffer overflow"), + &["namespace"], registry ) .unwrap(), - encoding_failures: register_int_counter_with_registry!( + encoding_failures: register_int_counter_vec_with_registry!( format!("vector_encoding_failures"), format!("Number of events that were dropped due to encoding failure"), + &["namespace"], registry ) .unwrap(), - batch_buffer_size: register_int_gauge_with_registry!( + batch_buffer_size: register_int_gauge_vec_with_registry!( format!("vector_batch_buffer_size"), format!("Number of batches in the outgoing buffer"), + &["namespace"], registry ) .unwrap(), - batch_flush_retries: register_int_counter_with_registry!( + batch_flush_retries: register_int_counter_vec_with_registry!( format!("vector_batch_flush_retries"), format!("Number of batch flush retries"), + &["namespace"], registry ) .unwrap(), @@ -120,38 +128,42 @@ impl Metrics { batch_flushes: register_int_counter_vec_with_registry!( format!("vector_batch_flushes"), format!("Count of batch flushes"), - &["ok"], + &["namespace", "ok"], registry ) .unwrap(), - batch_queue_duration: register_histogram_with_registry!( + batch_queue_duration: register_histogram_vec_with_registry!( format!("vector_batch_queue_duration"), format!("Time it takes to queue the batch"), + &["namespace"], vec![0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2], registry ) .unwrap(), - batch_encode_duration: register_histogram_with_registry!( + batch_encode_duration: register_histogram_vec_with_registry!( format!("vector_batch_encode_duration"), format!("Time it takes to encode the batch"), + &["namespace"], vec![0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2], registry ) .unwrap(), - batch_flush_duration: register_histogram_with_registry!( + batch_flush_duration: register_histogram_vec_with_registry!( format!("vector_batch_flush_duration"), format!("Time it takes to flush the batch"), + &["namespace"], vec![0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2], registry ) .unwrap(), - batch_sizes: register_histogram_with_registry!( + batch_sizes: register_histogram_vec_with_registry!( format!("vector_batch_sizes"), format!("Batch sizes histogram"), + &["namespace"], vec![ 128.0 * KB, 256.0 * KB, @@ -198,6 +210,7 @@ pub fn encode_batch(batch: Vec) -> Result { } pub struct Vector { + namespace: String, token_batcher: CancellationToken, token_flushers: CancellationToken, token_flushers_drain: CancellationToken, @@ -208,14 +221,27 @@ pub struct Vector { } impl Vector { - pub fn new(cli: &VectorCli, client: Arc, registry: &Registry) -> Self { + pub fn new( + cli: &VectorCli, + client: Arc, + namespace: &str, + registry: &Registry, + ) -> Self { + let metrics = Metrics::new(registry); + Self::new_with_metrics(cli, client, namespace, metrics) + } + + pub fn new_with_metrics( + cli: &VectorCli, + client: Arc, + namespace: &str, + metrics: Metrics, + ) -> Self { let cli = cli.clone(); let (tx_event, rx_event) = mpsc::channel(cli.log_vector_buffer); let (tx_batch, rx_batch) = async_channel::bounded(cli.log_vector_batch_queue); - let metrics = Metrics::new(registry); - // Start batcher warn!("Vector: starting batcher"); let token_batcher = CancellationToken::new(); @@ -230,6 +256,7 @@ impl Vector { batch: Vec::with_capacity(cli.log_vector_batch), interval, token: token_batcher.child_token(), + namespace: namespace.to_string(), metrics: metrics.clone(), }; @@ -262,6 +289,7 @@ impl Vector { retry_interval: cli.log_vector_retry_interval, retry_count: cli.log_vector_retry_count, timeout: cli.log_vector_timeout, + namespace: namespace.to_string(), metrics: metrics.clone(), }; @@ -271,6 +299,7 @@ impl Vector { } Self { + namespace: namespace.to_string(), token_batcher, token_flushers, token_flushers_drain, @@ -283,10 +312,18 @@ impl Vector { pub fn send(&self, event: Value) { // If it fails we'll lose the event, but it's better than to block & eat memory. + if self.tx.try_send(event).is_err() { - self.metrics.buffer_drops.inc(); + self.metrics + .buffer_drops + .with_label_values(&[&self.namespace]) + .inc(); } else { - self.metrics.buffer_event_size.inc(); + let len = self.tx.max_capacity() - self.tx.capacity(); + self.metrics + .buffer_event_size + .with_label_values(&[&self.namespace]) + .set(len as i64); }; } @@ -316,6 +353,7 @@ struct Batcher { batch: Vec, interval: Interval, token: CancellationToken, + namespace: String, metrics: Metrics, } @@ -328,7 +366,16 @@ impl Display for Batcher { impl Batcher { async fn add_to_batch(&mut self, event: Value) { self.batch.push(event); - self.metrics.batch_size.set(self.batch.len() as i64); + + let len = self.rx.max_capacity() - self.rx.capacity(); + self.metrics + .buffer_event_size + .with_label_values(&[&self.namespace]) + .set(len as i64); + self.metrics + .batch_size + .with_label_values(&[&self.namespace]) + .set(self.batch.len() as i64); // If we've reached the capacity - it's time to flush if self.batch.len() == self.batch.capacity() { @@ -355,8 +402,18 @@ impl Batcher { debug!("{self}: batch ({len} events) queued in {dur}s"); - self.metrics.batch_queue_duration.observe(dur); - self.metrics.batch_buffer_size.inc(); + self.metrics + .batch_size + .with_label_values(&[&self.namespace]) + .set(0); + self.metrics + .batch_queue_duration + .with_label_values(&[&self.namespace]) + .observe(dur); + self.metrics + .batch_buffer_size + .with_label_values(&[&self.namespace]) + .inc(); } async fn drain(&mut self) { @@ -389,7 +446,6 @@ impl Batcher { }, Some(event) = self.rx.recv() => { - self.metrics.buffer_event_size.dec(); self.add_to_batch(event).await; } } @@ -409,6 +465,7 @@ struct Flusher { zstd_level: usize, token: CancellationToken, token_drain: CancellationToken, + namespace: String, metrics: Metrics, } @@ -473,7 +530,10 @@ impl Flusher { // Bytes is cheap to clone if let Err(e) = self.send_batch(batch.clone(), timeout).await { - self.metrics.batch_flushes.with_label_values(&["no"]).inc(); + self.metrics + .batch_flushes + .with_label_values(&[&self.namespace, "no"]) + .inc(); warn!( "{self}: unable to send (try {}, retry interval {}s): {e:#}", @@ -481,9 +541,18 @@ impl Flusher { interval.as_secs_f64() ); } else { - self.metrics.sent.inc_by(raw_size as u64); - self.metrics.sent_compressed.inc_by(batch.len() as u64); - self.metrics.batch_flushes.with_label_values(&["yes"]).inc(); + self.metrics + .sent + .with_label_values(&[&self.namespace]) + .inc_by(raw_size as u64); + self.metrics + .sent_compressed + .with_label_values(&[&self.namespace]) + .inc_by(batch.len() as u64); + self.metrics + .batch_flushes + .with_label_values(&[&self.namespace, "yes"]) + .inc(); debug!("{self}: batch sent in {}s", start.elapsed().as_secs_f64()); return Ok(()); @@ -493,7 +562,10 @@ impl Flusher { interval = (interval + self.retry_interval).min(self.retry_interval * 5); timeout = (timeout + self.timeout).min(self.timeout * 10); - self.metrics.batch_flush_retries.inc(); + self.metrics + .batch_flush_retries + .with_label_values(&[&self.namespace]) + .inc(); retries += 1; // Limit the retry count and reset the interval/timeout if we're draining. @@ -517,7 +589,10 @@ impl Flusher { #[allow(clippy::cognitive_complexity)] async fn process_batch(&self, batch: Batch) { let len = batch.events.len(); - self.metrics.batch_buffer_size.dec(); + self.metrics + .batch_buffer_size + .with_label_values(&[&self.namespace]) + .dec(); debug!("{self}: received batch ({len} events)"); @@ -527,25 +602,36 @@ impl Flusher { Ok(v) => { self.metrics .batch_encode_duration + .with_label_values(&[&self.namespace]) .observe(start.elapsed().as_secs_f64()); - self.metrics.batch_sizes.observe(v.len() as f64); + self.metrics + .batch_sizes + .with_label_values(&[&self.namespace]) + .observe(v.len() as f64); // Send it let start = Instant::now(); if let Err(e) = self.send_batch_retry(v).await { warn!("{self}: unable to flush: {e:#}"); } else { - self.metrics.sent_events.inc_by(len as u64); + self.metrics + .sent_events + .with_label_values(&[&self.namespace]) + .inc_by(len as u64); }; self.metrics .batch_flush_duration + .with_label_values(&[&self.namespace]) .observe(start.elapsed().as_secs_f64()); debug!("{self}: {len} events flushed"); } Err(e) => { - self.metrics.encoding_failures.inc(); + self.metrics + .encoding_failures + .with_label_values(&[&self.namespace]) + .inc(); warn!("{self}: unable to encode batch: {e:#}") } }; @@ -689,7 +775,7 @@ mod test { let cli = make_cli(); let client = Arc::new(TestClient(AtomicU64::new(0), AtomicU64::new(0))); - let vector = Vector::new(&cli, client.clone(), &Registry::new()); + let vector = Vector::new(&cli, client.clone(), "", &Registry::new()); for i in 0..5000 { let event = json!({ @@ -714,7 +800,7 @@ mod test { cli.log_vector_flushers = 32; let client = Arc::new(TestClientOk); - let vector = Vector::new(&cli, client, &Registry::new()); + let vector = Vector::new(&cli, client, "", &Registry::new()); for _ in 0..cli.log_vector_buffer { let event = json!({ @@ -764,7 +850,7 @@ mod test { vector.stop().await; assert_eq!( - vector.metrics.sent_events.get(), + vector.metrics.sent_events.with_label_values(&[""]).get(), cli.log_vector_buffer as u64, ); } @@ -775,7 +861,7 @@ mod test { let cli = make_cli(); let client = Arc::new(TestClientDead); - let vector = Vector::new(&cli, client, &Registry::new()); + let vector = Vector::new(&cli, client, "", &Registry::new()); for i in 0..6000 { let event = json!({