diff --git a/ic-bn-lib/src/lib.rs b/ic-bn-lib/src/lib.rs index b854cbe..b0f629e 100644 --- a/ic-bn-lib/src/lib.rs +++ b/ic-bn-lib/src/lib.rs @@ -20,7 +20,7 @@ pub mod utils; #[cfg(feature = "vector")] pub mod vector; -use std::{fs::File, path::Path}; +use std::{fs::File, net::IpAddr, path::Path}; use anyhow::{Context, anyhow}; use bytes::Bytes; @@ -171,6 +171,28 @@ macro_rules! retry_async { }; } +/// Returns family of an IP address +pub trait IpFamily { + fn family(&self) -> &'static str; +} + +impl IpFamily for IpAddr { + fn family(&self) -> &'static str { + if self.is_ipv4() { "v4" } else { "v6" } + } +} + +/// Converts bool to yes/no static str +pub trait BoolYesNo { + fn yesno(&self) -> &'static str; +} + +impl BoolYesNo for bool { + fn yesno(&self) -> &'static str { + if *self { "yes" } else { "no" } + } +} + #[macro_export] macro_rules! dyn_event { ($lvl:ident, $($arg:tt)+) => { diff --git a/ic-bn-lib/src/smtp/cli.rs b/ic-bn-lib/src/smtp/cli.rs index 49c4c11..53f8c14 100644 --- a/ic-bn-lib/src/smtp/cli.rs +++ b/ic-bn-lib/src/smtp/cli.rs @@ -43,8 +43,9 @@ pub struct SmtpServerCli { #[clap(env, long, default_value = "5")] pub smtp_server_max_errors_per_session: usize, - /// Maximum message body size - #[clap(env, long, default_value = "2MB", value_parser = parse_size)] + /// Maximum message body size. + /// Default accounts for max IC message size + some overhead. + #[clap(env, long, default_value = "1950KB", value_parser = parse_size)] pub smtp_server_max_message_size: u64, /// How much data can be ingested during a single SMTP session diff --git a/ic-bn-lib/src/smtp/ic/candid.rs b/ic-bn-lib/src/smtp/ic/candid.rs index 2a66fe9..7a872c4 100644 --- a/ic-bn-lib/src/smtp/ic/candid.rs +++ b/ic-bn-lib/src/smtp/ic/candid.rs @@ -36,6 +36,7 @@ pub struct SmtpRequest { pub message: Option, pub envelope: Option, pub gateway_flags: Option>, + pub message_id: Option, } /// Candid `SmtpRequestError` (`code` is `nat64` on the wire in typical canisters). diff --git a/ic-bn-lib/src/smtp/ic/delivery_agent.rs b/ic-bn-lib/src/smtp/ic/delivery_agent.rs index cdc7aa8..73594cd 100644 --- a/ic-bn-lib/src/smtp/ic/delivery_agent.rs +++ b/ic-bn-lib/src/smtp/ic/delivery_agent.rs @@ -1,32 +1,41 @@ -use std::{fmt::Display, str::FromStr, sync::Arc, time::Duration}; +use std::{ + fmt::Display, + str::FromStr, + sync::Arc, + time::{Duration, Instant}, +}; use ahash::{AHashMap, RandomState}; use async_trait::async_trait; use candid::Principal; -use futures::future::try_join_all; +use futures::future::join_all; use http::Method; use ic_agent::{Agent, AgentError}; use ic_bn_lib_common::traits::http::Client; use moka::sync::Cache; +use strum::IntoStaticStr; use tracing::{debug, info}; use url::Url; use crate::{ + BoolYesNo, custom_domains::LooksUpCustomDomain, smtp::{ DeliversMail, DeliveryError, EmailMessage, RecipientPolicy, RecipientResolveError, ResolvesRecipient, address::EmailAddress, ic::{ - ExecutesIcSmtpRequest, IcSmtpRequestExecutor, + ExecutesIcSmtpRequest, IcSmtpRequestExecutor, Metrics, candid::{Envelope, SmtpRequest, SmtpResponse}, parse_email, }, inbound::SessionMeta, }, + truncate, }; -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Debug, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] pub enum IcSmtpDeliveryAgentError { #[error("IC Agent error: {0}")] Agent(#[from] ic_agent::AgentError), @@ -43,6 +52,7 @@ pub struct IcSmtpDeliveryAgent { http_client: Arc, ic_base_domain: String, smtp_canister_id_cache: Cache, + metrics: Metrics, } impl Display for IcSmtpDeliveryAgent { @@ -60,6 +70,7 @@ impl IcSmtpDeliveryAgent { ic_base_domain: &str, cache_ttl: Duration, cache_capacity: u64, + metrics: Metrics, ) -> Self { let smtp_canister_id_cache = Cache::builder() .time_to_live(cache_ttl) @@ -72,6 +83,7 @@ impl IcSmtpDeliveryAgent { http_client, ic_base_domain: ic_base_domain.into(), smtp_canister_id_cache, + metrics, } } @@ -83,6 +95,7 @@ impl IcSmtpDeliveryAgent { ic_base_domain: &str, cache_ttl: Duration, cache_capacity: u64, + metrics: Metrics, ) -> Self { let request_executor = Arc::new(IcSmtpRequestExecutor::new(agent)); @@ -93,9 +106,39 @@ impl IcSmtpDeliveryAgent { ic_base_domain, cache_ttl, cache_capacity, + metrics, ) } + fn observe_canister_lookup( + &self, + success: bool, + custom_domain: bool, + smtp_canister: bool, + cached: bool, + elapsed: Duration, + ) { + self.metrics + .canister_id_lookups + .with_label_values(&[ + success.yesno(), + custom_domain.yesno(), + smtp_canister.yesno(), + cached.yesno(), + ]) + .inc(); + + self.metrics + .canister_id_lookup_latency + .with_label_values(&[ + success.yesno(), + custom_domain.yesno(), + smtp_canister.yesno(), + cached.yesno(), + ]) + .observe(elapsed.as_secs_f64()); + } + /// Executes an HTTP request to the canister to get the SMTP canister id async fn lookup_smtp_canister_id(&self, canister_id: Principal) -> Option { let url = Url::parse(&format!( @@ -141,22 +184,23 @@ impl IcSmtpDeliveryAgent { } Err(e) => { // Sanitize a bit - let mut body_str = body_str.replace("\r", " ").replace("\n", " "); - body_str.truncate(128); + let body_str = body_str.replace("\r", " ").replace("\n", " "); + let body_str = truncate(&body_str, 128); info!("{self}: {canister_id}: Incorrect SMTP canister ID: '{body_str}': {e:#}"); None } } } - /// Resolves SMTP canister ID for the given canister_id - async fn resolve_smtp_canister_id(&self, canister_id: Principal) -> Principal { + /// Resolves SMTP canister ID for the given canister_id. + /// Returns also if it was obtained from the cache. + async fn resolve_smtp_canister_id(&self, canister_id: Principal) -> (Principal, bool) { debug!("{self}: {canister_id}: Looking up SMTP canister ID"); // Try to find SMTP canister ID, check the cache first if let Some(v) = self.smtp_canister_id_cache.get(&canister_id) { debug!("{self}: {canister_id}: SMTP canister ID found in cache: {v}"); - return v; + return (v, true); } // Otherwise do a lookup with a fallback to canister_id @@ -173,13 +217,15 @@ impl IcSmtpDeliveryAgent { .insert(canister_id, smtp_canister_id); debug!("{self}: {canister_id}: SMTP canister ID obtained: {smtp_canister_id}"); - smtp_canister_id + (smtp_canister_id, false) } /// Resolves destination SMTP canister id for the given address async fn resolve_canister_id(&self, address: &EmailAddress) -> Option { debug!("{self}: {address}: resolving SMTP canister ID"); + let start = Instant::now(); + let mut custom_domain = false; // First check if the target domain has a canister as 1st label. // This covers addresses like "foo@qoctq-giaaa-aaaaa-aaaea-cai.icp0.io" let lbl = address.domain().labels().next()?; @@ -194,15 +240,26 @@ impl IcSmtpDeliveryAgent { .lookup_custom_domain(address.domain()) .inspect(|x| { debug!("{self}: {address}: found custom domain canister ID: {x}"); + custom_domain = true; }) }) else { debug!("{self}: {address}: unable to resolve canister ID"); + self.observe_canister_lookup(false, false, false, false, start.elapsed()); return None; }; // Finally check if there's an SMTP canister ID defined - Some(self.resolve_smtp_canister_id(canister_id).await) + let (smtp_canister_id, cached) = self.resolve_smtp_canister_id(canister_id).await; + self.observe_canister_lookup( + true, + custom_domain, + smtp_canister_id != canister_id, + cached, + start.elapsed(), + ); + + Some(smtp_canister_id) } /// Delivers given SMTP request to the canister @@ -249,8 +306,11 @@ impl DeliversMail for IcSmtpDeliveryAgent { message: Arc, ) -> Result<(), DeliveryError> { info!( - "{self}: delivering mail, ehlo: {:?}, from: '{}', to: '{:?}', id '{}'", - meta.ehlo_hostname, message.mail_from, message.rcpt_to, message.id + "{self}: delivering mail, ehlo: {}, from: '{}', to: '{:?}', id '{}'", + meta.ehlo_hostname.unwrap_or_default(), + message.mail_from, + message.rcpt_to, + message.id ); // A single message can be (potentially) destined for several canisters/domains. @@ -288,13 +348,35 @@ impl DeliversMail for IcSmtpDeliveryAgent { }), message: Some(parsed.clone()), gateway_flags: None, + message_id: Some(message.id.to_string()), }; - futs.push(self.smtp_request_deliver(canister_id, ic_smtp_request)); + futs.push(async move { + let start = Instant::now(); + let res = self + .smtp_request_deliver(canister_id, ic_smtp_request) + .await; + + let error_lbl: &'static str = if let Err(e) = &res { e.into() } else { "" }; + self.metrics + .smtp_requests + .with_label_values(&["no", error_lbl]) + .inc(); + self.metrics + .smtp_request_latency + .with_label_values(&["no", error_lbl]) + .observe(start.elapsed().as_secs_f64()); + + res + }); } - try_join_all(futs).await?; - Ok(()) + // Find & return 1st error if there's any + join_all(futs) + .await + .into_iter() + .find(|x| x.is_err()) + .unwrap_or(Ok(())) } } @@ -320,9 +402,11 @@ impl ResolvesRecipient for IcSmtpDeliveryAgent { }), message: None, gateway_flags: None, + message_id: None, }; - let ic_smtp_response = self + let start = Instant::now(); + let res = self .request_executor .canister_request(canister_id, ic_smtp_request, true) .await @@ -338,9 +422,19 @@ impl ResolvesRecipient for IcSmtpDeliveryAgent { } _ => RecipientResolveError::Temporary(e.to_string()), - })?; - - if let SmtpResponse::Err(e) = ic_smtp_response { + }); + + let error_lbl: &'static str = if let Err(e) = &res { e.into() } else { "" }; + self.metrics + .smtp_requests + .with_label_values(&["yes", error_lbl]) + .inc(); + self.metrics + .smtp_request_latency + .with_label_values(&["yes", error_lbl]) + .observe(start.elapsed().as_secs_f64()); + + if let SmtpResponse::Err(e) = res? { info!( "{self}: {canister_id}: failed to resolve recipient: {} {}", e.code, e.message @@ -385,6 +479,7 @@ mod tests { use fqdn::{FQDN, fqdn}; use ic_bn_lib_common::principal; use indoc::indoc; + use prometheus::Registry; use uuid::Uuid; #[derive(Debug)] @@ -501,6 +596,7 @@ mod tests { "icp0.io", Duration::from_secs(10), 10, + Metrics::new(&Registry::new()), ), http_client, request_executor, @@ -691,6 +787,7 @@ mod tests { body: body.as_bytes().to_vec(), }), gateway_flags: None, + message_id: Some(Uuid::nil().to_string()), } }; diff --git a/ic-bn-lib/src/smtp/ic/mod.rs b/ic-bn-lib/src/smtp/ic/mod.rs index 888bd6e..7e6d0d7 100644 --- a/ic-bn-lib/src/smtp/ic/mod.rs +++ b/ic-bn-lib/src/smtp/ic/mod.rs @@ -6,6 +6,10 @@ use async_trait::async_trait; use derive_new::new; use ic_agent::Agent; use mail_parser::MessageParser; +use prometheus::{ + HistogramVec, IntCounterVec, Registry, register_histogram_vec_with_registry, + register_int_counter_vec_with_registry, +}; use tracing::debug; use crate::smtp::ic::{ @@ -71,6 +75,58 @@ impl Display for IcSmtpRequestExecutor { } } +#[derive(Clone, Debug)] +pub struct Metrics { + canister_id_lookups: IntCounterVec, + canister_id_lookup_latency: HistogramVec, + smtp_requests: IntCounterVec, + smtp_request_latency: HistogramVec, +} + +impl Metrics { + pub fn new(registry: &Registry) -> Self { + const CANISTER_LABELS: &[&str] = + &["success", "custom_domain", "is_smtp_canister", "cached"]; + const REQUEST_LABELS: &[&str] = &["validate", "error"]; + + Self { + canister_id_lookups: register_int_counter_vec_with_registry!( + format!("smtp_ic_agent_canister_id_lookups"), + format!("Number of canister ID lookups"), + CANISTER_LABELS, + registry + ) + .unwrap(), + + canister_id_lookup_latency: register_histogram_vec_with_registry!( + format!("smtp_ic_agent_canister_id_lookup_latency"), + format!("Time it took to resolve the canister ID"), + CANISTER_LABELS, + vec![0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6], + registry + ) + .unwrap(), + + smtp_requests: register_int_counter_vec_with_registry!( + format!("smtp_ic_agent_smtp_requests"), + format!("Number of IC SMTP requests"), + REQUEST_LABELS, + registry + ) + .unwrap(), + + smtp_request_latency: register_histogram_vec_with_registry!( + format!("smtp_ic_agent_smtp_request_latency"), + format!("Time it took to execute IC SMTP request"), + REQUEST_LABELS, + vec![0.2, 0.4, 0.8, 1.6, 3.2, 6.4], + registry + ) + .unwrap(), + } + } +} + /// Parses raw MIME email into IC SMTP Message pub fn parse_email(raw: &[u8]) -> Result { let parsed = MessageParser::new() diff --git a/ic-bn-lib/src/smtp/inbound/manager.rs b/ic-bn-lib/src/smtp/inbound/manager.rs index f93e247..6c06a6f 100644 --- a/ic-bn-lib/src/smtp/inbound/manager.rs +++ b/ic-bn-lib/src/smtp/inbound/manager.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, time::Instant}; use tokio::io::AsyncWriteExt; use tokio_rustls::server::TlsStream; @@ -6,10 +6,14 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info}; use crate::{ + IpFamily as _, network::{AsyncReadWrite, tls_handshake}, - smtp::inbound::{ - Session, SessionConfig, SessionData, SessionError, SessionResult, SessionTlsMode, - SessionUpgrade, + smtp::{ + Metrics, + inbound::{ + Session, SessionConfig, SessionData, SessionError, SessionResult, SessionTlsMode, + SessionUpgrade, + }, }, }; @@ -24,11 +28,18 @@ impl SessionManager { stream: S, remote_addr: SocketAddr, params: Arc, + metrics: Metrics, shutdown_token: CancellationToken, ) { // Convert v6-mapped address to v4 let remote_ip = remote_addr.ip().to_canonical(); - let mut session = Session::new(remote_ip, stream, params); + let mut session = Session::new(remote_ip, stream, params, metrics); + + session + .metrics + .sessions_open + .with_label_values(&[remote_ip.family()]) + .inc(); match session.handle(shutdown_token.child_token()).await { Ok(v) => match v { @@ -87,6 +98,33 @@ impl SessionManager { } async fn notify(session: &Session, error: Option) { + let ip_family = session.remote_ip.family(); + + let tls_proto = session + .tls_info + .as_ref() + .map_or("", |x| x.protocol.as_str().unwrap_or_default()); + let error_lbl: &'static str = error.as_ref().map_or("", |x| x.into()); + session + .metrics + .sessions_processed + .with_label_values(&[ip_family, tls_proto, error_lbl]) + .inc(); + session + .metrics + .sessions_open + .with_label_values(&[ip_family]) + .dec(); + session + .metrics + .session_duration + .with_label_values(&[ip_family, tls_proto]) + .observe( + Instant::now() + .duration_since(session.counters.started) + .as_secs_f64(), + ); + if let Some(v) = session.cfg.notifications_handler.clone() { let meta = session.meta(); tokio::spawn(async move { @@ -99,6 +137,8 @@ impl SessionManager { impl Session { /// Converts the plain-text session into a TLS one by doing a TLS handshake pub async fn into_tls(self) -> SessionResult>> { + let ip_family = self.remote_ip.family(); + // SAFETY: We should end up here only if TLS is enabled. // It's better to panic otherwise. let tls_config = match &self.cfg.tls_mode { @@ -112,23 +152,44 @@ impl Session { let (stream, tls_info) = match tls_handshake(tls_config, self.stream).await { Ok(v) => v, Err(e) => { + let error = SessionError::TlsHandshakeFailed(e.to_string()); let error_str = e.to_string(); + let error_lbl: &'static str = (&error).into(); // Session is partially consumed by `tls_handshake`, so we can't use `Manager::notify()` + self.metrics + .sessions_processed + .with_label_values(&[ip_family, "", error_lbl]) + .inc(); + self.metrics + .sessions_open + .with_label_values(&[ip_family]) + .dec(); + self.metrics + .session_duration + .with_label_values(&[ip_family, ""]) + .observe( + Instant::now() + .duration_since(self.counters.started) + .as_secs_f64(), + ); + if let Some(v) = self.cfg.notifications_handler.clone() { tokio::spawn(async move { v.notify_session_finish( meta, - Some(SessionError::TlsHandshakeFailed(error_str.clone())), + Some(SessionError::TlsHandshakeFailed(error_str)), ) .await; }); } - return Err(SessionError::TlsHandshakeFailed(e.to_string())); + return Err(error); } }; + let tls_proto = tls_info.protocol.as_str().unwrap_or_default(); + Ok(Session { id: self.id, remote_ip: self.remote_ip, @@ -141,6 +202,8 @@ impl Session { counters: self.counters, cfg: self.cfg, tls_info: Some(tls_info), + labels: [ip_family, tls_proto], + metrics: self.metrics, }) } } diff --git a/ic-bn-lib/src/smtp/inbound/mod.rs b/ic-bn-lib/src/smtp/inbound/mod.rs index 8ceca19..f705c83 100644 --- a/ic-bn-lib/src/smtp/inbound/mod.rs +++ b/ic-bn-lib/src/smtp/inbound/mod.rs @@ -12,6 +12,7 @@ use std::{ time::{Duration, Instant}, }; +use anyhow::Context; use bytes::Bytes; use fqdn::FQDN; use hickory_resolver::net::NetError; @@ -20,19 +21,21 @@ use mail_auth::MessageAuthenticator; use rustls::ServerConfig; use smtp_proto::{ EXT_8BIT_MIME, EXT_CHUNKING, EXT_ENHANCED_STATUS_CODES, EXT_PIPELINING, EXT_SIZE, - EXT_SMTP_UTF8, EXT_START_TLS, EhloResponse, Error as SmtpError, + EXT_SMTP_UTF8, EXT_START_TLS, EhloResponse, Error as SmtpError, Request, request::receiver::{ BdatReceiver, DataReceiver, DummyDataReceiver, DummyLineReceiver, RequestReceiver, }, }; use strum::{Display, IntoStaticStr}; +use tokio::io::AsyncWriteExt; +use tokio_util::time::FutureExt; use uuid::Uuid; use crate::{ network::AsyncReadWrite, smtp::{ DeliversMail, DummyDeliveryAgent, DummyRecipientResolver, EmailMessage, MessageError, - ProtocolError, ReceivesNotifications, ResolvesRecipient, address::EmailAddress, + Metrics, ProtocolError, ReceivesNotifications, ResolvesRecipient, address::EmailAddress, }, }; @@ -109,6 +112,7 @@ pub struct SessionConfig { ehlo_tls: Bytes, max_message_size: usize, + pub max_recipients: usize, pub max_session_duration: Duration, pub max_session_data: usize, @@ -298,21 +302,31 @@ pub struct Session { counters: SessionCounters, cfg: Arc, tls_info: Option, + labels: [&'static str; 2], + metrics: Metrics, } impl Display for Session { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "SMTP/Session({}){}", + "SMTP/Session({}{})", self.remote_ip, if self.tls_info.is_some() { "/TLS" } else { "" } - ) + )?; + + if let Some(v) = &self.data.ehlo_hostname { + write!(f, "({v})")?; + } + + Ok(()) } } impl Session { - pub fn new(remote_ip: IpAddr, stream: S, cfg: Arc) -> Self { + pub fn new(remote_ip: IpAddr, stream: S, cfg: Arc, metrics: Metrics) -> Self { + let ip_family = if remote_ip.is_ipv4() { "v4" } else { "v6" }; + Self { #[cfg(not(test))] id: Uuid::now_v7(), @@ -325,10 +339,30 @@ impl Session { counters: SessionCounters::new(), cfg, tls_info: None, + labels: [ip_family, ""], + metrics, } } + /// Closes the connection + pub async fn shutdown(&mut self) -> SessionResult<()> { + self.stream + .shutdown() + .timeout(Duration::from_secs(10)) + .await + .context("shutdown timed out")? + .context("shutdown failed")?; + + Ok(()) + } + fn set_error(&mut self, error: ProtocolError) { + let error_lbl: &'static str = (&error).into(); + self.metrics + .protocol_errors + .with_label_values(&[self.labels[0], self.labels[1], error_lbl]) + .inc(); + self.data.last_error = Some(error.clone()); self.counters.errors += 1; @@ -339,6 +373,17 @@ impl Session { } fn notify_message(&self, msg: Arc, error: Option) { + self.metrics + .message_size + .with_label_values(&self.labels) + .observe(msg.body.len() as f64); + + let error_lbl: &'static str = error.as_ref().map_or("", |x| x.into()); + self.metrics + .messages + .with_label_values(&[self.labels[0], self.labels[1], error_lbl]) + .inc(); + if let Some(v) = self.cfg.notifications_handler.clone() { let meta = self.meta(); tokio::spawn(async move { @@ -362,5 +407,29 @@ impl Session { } } +/// Hand-made implementation of IntoStaticStr for an SMTP request +const fn request_str(req: &Request) -> &'static str { + match req { + Request::Bdat { .. } => "BDAT", + Request::Data => "DATA", + Request::Ehlo { .. } => "EHLO", + Request::Helo { .. } => "HELO", + Request::Help { .. } => "HELP", + Request::Mail { .. } => "MAIL", + Request::Rcpt { .. } => "RCPT", + Request::Noop { .. } => "NOOP", + Request::Quit => "QUIT", + Request::Rset => "RSET", + Request::StartTls => "STARTTLS", + Request::Atrn { .. } => "ATRN", + Request::Auth { .. } => "AUTH", + Request::Burl { .. } => "BURL", + Request::Etrn { .. } => "ETRN", + Request::Expn { .. } => "EXPN", + Request::Lhlo { .. } => "LHLO", + Request::Vrfy { .. } => "VRFY", + } +} + #[cfg(test)] mod tests; diff --git a/ic-bn-lib/src/smtp/inbound/session.rs b/ic-bn-lib/src/smtp/inbound/session.rs index 7e79686..6fd004a 100644 --- a/ic-bn-lib/src/smtp/inbound/session.rs +++ b/ic-bn-lib/src/smtp/inbound/session.rs @@ -3,10 +3,9 @@ use std::{ fmt::{self, Write as _}, io::Write as _, sync::Arc, - time::{Duration, Instant}, + time::Instant, }; -use anyhow::Context; use arrayvec::ArrayString; use mail_auth::{AuthenticatedMessage, DkimResult}; use smtp_proto::{ @@ -27,6 +26,7 @@ use crate::{ DeliveryError, EmailMessage, MessageError, ProtocolError, inbound::{ MAX_REPLY_LEN, Session, SessionError, SessionResult, SessionState, SessionUpgrade, + request_str, }, }, }; @@ -36,6 +36,12 @@ impl Session { pub async fn write(&mut self, bytes: &[u8]) -> SessionResult<()> { self.stream.write_all(bytes).await?; self.stream.flush().await?; + + self.metrics + .bytes_tx + .with_label_values(&self.labels) + .inc_by(bytes.len() as u64); + Ok(()) } @@ -45,6 +51,11 @@ impl Session { /// a stack array to avoid heap allocation for performance reasons. /// If ever this module would need more - increase the constant. pub(crate) async fn reply(&mut self, code: &str, ext: &str, msg: &str) -> SessionResult<()> { + self.metrics + .replies + .with_label_values(&[self.labels[0], self.labels[1], code, ext]) + .inc(); + let len = code.len() + ext.len() + msg.len() + 4; assert!( len <= MAX_REPLY_LEN, @@ -66,6 +77,11 @@ impl Session { ext: &str, msg_fn: impl FnOnce(&mut ArrayString) -> fmt::Result, ) -> SessionResult<()> { + self.metrics + .replies + .with_label_values(&[self.labels[0], self.labels[1], code, ext]) + .inc(); + let mut buf = ArrayString::::new(); write!(&mut buf, "{code} {ext} ")?; @@ -113,6 +129,11 @@ impl Session { match self.stream.read(&mut buf).timeout(v).await { Ok(Ok(bytes_read)) => { if bytes_read > 0 { + self.metrics + .bytes_rx + .with_label_values(&self.labels) + .inc_by(bytes_read as u64); + self.reply( "501", "5.7.1", @@ -213,6 +234,11 @@ impl Session { /// Main SMTP state machine async fn ingest(&mut self, bytes: &[u8]) -> SessionResult { + self.metrics + .bytes_rx + .with_label_values(&self.labels) + .inc_by(bytes.len() as u64); + // Check if we are over session transfer quota if self.counters.bytes_ingested + bytes.len() > self.cfg.max_session_data { self.reply("452", "4.7.28", "Session transfer quota exceeded.") @@ -251,64 +277,77 @@ impl Session { } SessionState::Request(rx) => { match rx.ingest(&mut iter) { - Ok(request) => match request { - // ASCII data - Request::Data => { - debug!("{self}: <- DATA"); - if self.can_accept_message().await? { - self.write(b"354 Start mail input; end with .\r\n") + Ok(request) => { + self.metrics + .commands + .with_label_values(&[ + self.labels[0], + self.labels[1], + request_str(&request), + ]) + .inc(); + + match request { + // ASCII data + Request::Data => { + debug!("{self}: <- DATA"); + if self.can_accept_message().await? { + self.write( + b"354 Start mail input; end with .\r\n", + ) .await?; - self.data.message = Vec::with_capacity(1024); - state = SessionState::Data(DataReceiver::new()); - continue; + self.data.message = Vec::with_capacity(1024); + state = SessionState::Data(DataReceiver::new()); + continue; + } } - } - // Binary data - Request::Bdat { - chunk_size, - is_last, - } => { - debug!("{self}: <- BDAT"); - // Check if we will be past max message limit with this chunk - state = if self.data.message.len() + chunk_size - > self.cfg.max_message_size - { - SessionState::DataTooLarge(DummyDataReceiver::new_bdat( - chunk_size, - )) - } else { - // Preallocate the needed capacity for the chunk if need be - let free = - self.data.message.capacity() - self.data.message.len(); - if free < chunk_size { - self.data.message.reserve(chunk_size - free); + // Binary data + Request::Bdat { + chunk_size, + is_last, + } => { + debug!("{self}: <- BDAT"); + // Check if we will be past max message limit with this chunk + state = if self.data.message.len() + chunk_size + > self.cfg.max_message_size + { + SessionState::DataTooLarge(DummyDataReceiver::new_bdat( + chunk_size, + )) + } else { + // Preallocate the needed capacity for the chunk if need be + let free = + self.data.message.capacity() - self.data.message.len(); + if free < chunk_size { + self.data.message.reserve(chunk_size - free); + } + + SessionState::Bdat(BdatReceiver::new(chunk_size, is_last)) } - - SessionState::Bdat(BdatReceiver::new(chunk_size, is_last)) } - } - Request::StartTls => { - debug!("{self}: <- STARTTLS"); - if self.tls_info.is_some() { - self.set_error(ProtocolError::InvalidSequenceOfCommands( - "STARTTLS inside STARTTLS".into(), - )); - self.reply("504", "5.7.4", "Already in TLS mode.").await?; - } else if !self.cfg.tls_mode.enabled() { - self.set_error(ProtocolError::InvalidSequenceOfCommands( - "STARTTLS without TLS enabled".into(), - )); - self.reply("502", "5.7.0", "TLS not available.").await?; - } else { - self.reply("220", "2.0.0", "Ready to start TLS.").await?; - self.state = state; - return Ok(SessionUpgrade::StartTls); + Request::StartTls => { + debug!("{self}: <- STARTTLS"); + if self.tls_info.is_some() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "STARTTLS inside STARTTLS".into(), + )); + self.reply("504", "5.7.4", "Already in TLS mode.").await?; + } else if !self.cfg.tls_mode.enabled() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "STARTTLS without TLS enabled".into(), + )); + self.reply("502", "5.7.0", "TLS not available.").await?; + } else { + self.reply("220", "2.0.0", "Ready to start TLS.").await?; + self.state = state; + return Ok(SessionUpgrade::StartTls); + } + } + other_request => { + self.handle_request(other_request).await?; } } - other_request => { - self.handle_request(other_request).await?; - } - }, + } Err(SmtpError::ResponseTooLong) => { state = SessionState::RequestTooLarge(DummyLineReceiver::default()); continue; @@ -642,16 +681,4 @@ impl Session { self.data.rcpt_to.clear(); self.data.message.clear(); } - - /// Closes the connection - pub async fn shutdown(&mut self) -> SessionResult<()> { - self.stream - .shutdown() - .timeout(Duration::from_secs(10)) - .await - .context("shutdown timed out")? - .context("shutdown failed")?; - - Ok(()) - } } diff --git a/ic-bn-lib/src/smtp/inbound/tests.rs b/ic-bn-lib/src/smtp/inbound/tests.rs index f2b89d2..d6b0e67 100644 --- a/ic-bn-lib/src/smtp/inbound/tests.rs +++ b/ic-bn-lib/src/smtp/inbound/tests.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use ic_bn_lib_common::types::http::ListenerOpts; use mail_parser::{Addr, Address, MessageParser}; use mail_send::{SmtpClientBuilder, mail_builder::MessageBuilder}; +use prometheus::Registry; use rustls::{ClientConfig, ProtocolVersion, pki_types::ServerName}; use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; use tokio_rustls::TlsConnector; @@ -104,7 +105,12 @@ fn create_session(stream: S, greeting_delay: Option cfg.max_session_data = 8192; cfg.max_recipients = 3; - Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)) + Session::new( + IpAddr::from_str("1.1.1.1").unwrap(), + stream, + Arc::new(cfg), + Metrics::new(&Registry::new()), + ) } fn create_basic_stream() -> tokio_test::io::Builder { @@ -259,7 +265,12 @@ async fn test_bdat() { cfg.recipient_resolver = Arc::new(resolver); let remote_ip = IpAddr::from_str("1.1.1.1").unwrap(); - let mut session = Session::new(remote_ip, stream, Arc::new(cfg)); + let mut session = Session::new( + remote_ip, + stream, + Arc::new(cfg), + Metrics::new(&Registry::new()), + ); assert!(matches!( session.handle(CancellationToken::new()).await.unwrap_err(), @@ -300,7 +311,12 @@ async fn test_data() { cfg.recipient_resolver = Arc::new(resolver); let remote_ip = IpAddr::from_str("1.1.1.1").unwrap(); - let mut session = Session::new(remote_ip, stream, Arc::new(cfg)); + let mut session = Session::new( + remote_ip, + stream, + Arc::new(cfg), + Metrics::new(&Registry::new()), + ); assert!(matches!( session.handle(CancellationToken::new()).await.unwrap_err(), @@ -344,7 +360,12 @@ async fn test_expand() { cfg.recipient_resolver = Arc::new(resolver); let remote_ip = IpAddr::from_str("1.1.1.1").unwrap(); - let mut session = Session::new(remote_ip, stream, Arc::new(cfg)); + let mut session = Session::new( + remote_ip, + stream, + Arc::new(cfg), + Metrics::new(&Registry::new()), + ); assert!(matches!( session.handle(CancellationToken::new()).await.unwrap_err(), @@ -523,6 +544,7 @@ async fn test_starttls() { stream1, SocketAddr::from_str("1.1.1.1:123").unwrap(), Arc::new(cfg), + Metrics::new(&Registry::new()), CancellationToken::new(), ) .await; @@ -608,7 +630,7 @@ async fn test_with_smtp_client() { let token = CancellationToken::new(); let token_child = token.child_token(); - let server = Server::new_with_listener(listener, cfg).unwrap(); + let server = Server::new_with_listener(listener, cfg, Metrics::new(&Registry::new())).unwrap(); let server_handle = tokio::spawn(async move { server.serve(token_child).await.unwrap(); }); diff --git a/ic-bn-lib/src/smtp/mod.rs b/ic-bn-lib/src/smtp/mod.rs index ca5ef19..c8636f9 100644 --- a/ic-bn-lib/src/smtp/mod.rs +++ b/ic-bn-lib/src/smtp/mod.rs @@ -6,6 +6,10 @@ use std::{ use async_trait::async_trait; use bytes::Bytes; use itertools::Itertools; +use prometheus::{ + HistogramVec, IntCounterVec, IntGaugeVec, Registry, register_histogram_vec_with_registry, + register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, +}; use strum::{Display, IntoStaticStr}; use tracing::warn; use uuid::Uuid; @@ -33,7 +37,8 @@ pub enum RecipientPolicy { } /// Recipient resolution error -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Debug, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] pub enum RecipientResolveError { #[error("Unknown recipient")] UnknownRecipient, @@ -46,7 +51,8 @@ pub enum RecipientResolveError { } /// Delivery error -#[derive(thiserror::Error, Clone, Debug)] +#[derive(thiserror::Error, Clone, Debug, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] pub enum DeliveryError { #[error("{0}")] Temporary(String), @@ -150,6 +156,110 @@ pub trait DeliversMail: Send + Sync + Debug { ) -> Result<(), DeliveryError>; } +#[derive(Clone)] +pub struct Metrics { + bytes_rx: IntCounterVec, + bytes_tx: IntCounterVec, + commands: IntCounterVec, + replies: IntCounterVec, + messages: IntCounterVec, + protocol_errors: IntCounterVec, + sessions_open: IntGaugeVec, + sessions_processed: IntCounterVec, + session_duration: HistogramVec, + message_size: HistogramVec, +} + +impl Metrics { + pub fn new(registry: &Registry) -> Self { + const LABELS: &[&str] = &["ip_family", "tls_proto"]; + + Self { + bytes_rx: register_int_counter_vec_with_registry!( + format!("smtp_bytes_rx"), + format!("Number of bytes received"), + LABELS, + registry + ) + .unwrap(), + + bytes_tx: register_int_counter_vec_with_registry!( + format!("smtp_bytes_tx"), + format!("Number of bytes sent"), + LABELS, + registry + ) + .unwrap(), + + commands: register_int_counter_vec_with_registry!( + format!("smtp_commands"), + format!("Number of SMTP commands received"), + &[LABELS[0], LABELS[1], "command"], + registry + ) + .unwrap(), + + replies: register_int_counter_vec_with_registry!( + format!("smtp_replies"), + format!("Number of SMTP replies sent"), + &[LABELS[0], LABELS[1], "code", "ext"], + registry + ) + .unwrap(), + + messages: register_int_counter_vec_with_registry!( + format!("smtp_messages"), + format!("Number of SMTP messages submitted"), + &[LABELS[0], LABELS[1], "error"], + registry + ) + .unwrap(), + + message_size: register_histogram_vec_with_registry!( + format!("smtp_message_size"), + format!("Size of the SMTP messages in bytes"), + LABELS, + vec![1024.0, 16384.0, 131072.0, 524288.0, 2097152.0], + registry + ) + .unwrap(), + + protocol_errors: register_int_counter_vec_with_registry!( + format!("smtp_protocol_errors"), + format!("Number of SMTP protocol errors"), + &[LABELS[0], LABELS[1], "error"], + registry + ) + .unwrap(), + + sessions_open: register_int_gauge_vec_with_registry!( + format!("smtp_sessions_open"), + format!("Number of SMTP sessions currently open"), + &[LABELS[0]], + registry + ) + .unwrap(), + + sessions_processed: register_int_counter_vec_with_registry!( + format!("smtp_sessions_processed"), + format!("Number of SMTP sessions processed"), + &[LABELS[0], LABELS[1], "error"], + registry + ) + .unwrap(), + + session_duration: register_histogram_vec_with_registry!( + format!("smtp_session_duration"), + format!("Time in seconds that the session was open"), + LABELS, + vec![5.0, 10.0, 30.0, 60.0, 120.0], + registry + ) + .unwrap(), + } + } +} + #[derive(Debug)] pub struct DummyRecipientResolver; diff --git a/ic-bn-lib/src/smtp/server.rs b/ic-bn-lib/src/smtp/server.rs index caf9b7c..91bc244 100644 --- a/ic-bn-lib/src/smtp/server.rs +++ b/ic-bn-lib/src/smtp/server.rs @@ -11,7 +11,10 @@ use tracing::{info, warn}; use crate::{ network::listener::listen_tcp, - smtp::inbound::{SessionConfig, manager::SessionManager}, + smtp::{ + Metrics, + inbound::{SessionConfig, manager::SessionManager}, + }, }; /// Listens for new SMTP connections and creates sessions @@ -20,6 +23,7 @@ pub struct Server { listener: TcpListener, params: Arc, tracker: TaskTracker, + metrics: Metrics, } impl Display for Server { @@ -30,18 +34,23 @@ impl Display for Server { impl Server { /// Creates a new `Server` to listen on `listen_addr` - pub fn new(listen_addr: SocketAddr, cfg: SessionConfig) -> io::Result { + pub fn new(listen_addr: SocketAddr, cfg: SessionConfig, metrics: Metrics) -> io::Result { let listener = listen_tcp(listen_addr, ListenerOpts::default())?; - Self::new_with_listener(listener, cfg) + Self::new_with_listener(listener, cfg, metrics) } /// Creates a new `Server` from a pre-built `TcpListener` - pub fn new_with_listener(listener: TcpListener, params: SessionConfig) -> io::Result { + pub fn new_with_listener( + listener: TcpListener, + params: SessionConfig, + metrics: Metrics, + ) -> io::Result { Ok(Self { listen_addr: listener.local_addr()?, listener, params: Arc::new(params), tracker: TaskTracker::new(), + metrics, }) } @@ -56,7 +65,12 @@ impl Server { let (params, token) = (self.params.clone(), token.child_token()); self.tracker.spawn(SessionManager::handle_connection( - stream, addr, params, token, + stream, + addr, + params, + // Metrics are cheap to clone (Arc inside) + self.metrics.clone(), + token, )); } diff --git a/ic-bn-lib/src/vector/client.rs b/ic-bn-lib/src/vector/client.rs index a58746f..f60f776 100644 --- a/ic-bn-lib/src/vector/client.rs +++ b/ic-bn-lib/src/vector/client.rs @@ -6,12 +6,13 @@ use std::{ use crate::{ http::{client::basic_auth, headers::CONTENT_TYPE_OCTET_STREAM}, - hval, vector, + hval, + vector::{self, VectorOptions}, }; use anyhow::{Context, Error, anyhow}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use http::header::{AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE}; -use ic_bn_lib_common::{traits::http::Client as HttpClient, types::vector::VectorCli}; +use ic_bn_lib_common::traits::http::Client as HttpClient; use prometheus::{ HistogramVec, IntCounterVec, IntGaugeVec, Registry, register_histogram_vec_with_registry, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, @@ -222,38 +223,36 @@ pub struct Vector { impl Vector { pub fn new( - cli: &VectorCli, + opts: VectorOptions, client: Arc, namespace: &str, registry: &Registry, ) -> Self { let metrics = Metrics::new(registry); - Self::new_with_metrics(cli, client, namespace, metrics) + Self::new_with_metrics(opts, client, namespace, metrics) } pub fn new_with_metrics( - cli: &VectorCli, + opts: VectorOptions, client: Arc, namespace: &str, metrics: Metrics, ) -> Self { - let cli = cli.clone(); - - let (tx_event, rx_event) = mpsc::channel(cli.log_vector_buffer); - let (tx_batch, rx_batch) = async_channel::bounded(cli.log_vector_batch_queue); + let (tx_event, rx_event) = mpsc::channel(opts.buffer); + let (tx_batch, rx_batch) = async_channel::bounded(opts.batch_queue); // Start batcher warn!("Vector: starting batcher"); let token_batcher = CancellationToken::new(); - let mut interval = interval(cli.log_vector_interval); + let mut interval = interval(opts.batch_flush_interval); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval.reset(); let batcher = Batcher { rx: rx_event, tx: tx_batch, - batch: Vec::with_capacity(cli.log_vector_batch), + batch: Vec::with_capacity(opts.batch_size), interval, token: token_batcher.child_token(), namespace: namespace.to_string(), @@ -271,24 +270,22 @@ impl Vector { let tracker_flushers = TaskTracker::new(); // Prepare auth header - let auth = cli - .log_vector_user - .map(|x| basic_auth(x, cli.log_vector_pass)); + let auth = opts.user.map(|x| basic_auth(x, opts.pass)); - warn!("Vector: starting flushers ({})", cli.log_vector_flushers); - for id in 0..cli.log_vector_flushers { + warn!("Vector: starting flushers ({})", opts.flushers); + for id in 0..opts.flushers { let flusher = Flusher { id, rx: rx_batch.clone(), client: client.clone(), - url: cli.log_vector_url.clone().unwrap(), + url: opts.url.clone(), auth: auth.clone(), - zstd_level: cli.log_vector_zstd_level, + zstd_level: opts.zstd_level, token: token_flushers.child_token(), token_drain: token_flushers_drain.child_token(), - retry_interval: cli.log_vector_retry_interval, - retry_count: cli.log_vector_retry_count, - timeout: cli.log_vector_timeout, + retry_interval: opts.retry_interval, + retry_count: opts.retry_count, + timeout: opts.flush_timeout, namespace: namespace.to_string(), metrics: metrics.clone(), }; @@ -753,29 +750,27 @@ mod test { ); } - fn make_cli() -> VectorCli { - VectorCli { - log_vector_url: Some(Url::parse("http://127.0.0.1:1234").unwrap()), - log_vector_user: None, - log_vector_pass: None, - log_vector_batch: 50, - log_vector_buffer: 5000, - log_vector_interval: Duration::from_secs(100), - log_vector_timeout: Duration::from_secs(10), - log_vector_flushers: 4, - log_vector_zstd_level: 3, - log_vector_batch_queue: 32, - log_vector_retry_interval: Duration::from_millis(1), - log_vector_retry_count: 100, + fn make_opts() -> VectorOptions { + VectorOptions { + url: Url::parse("http://127.0.0.1:1234").unwrap(), + user: None, + pass: None, + batch_size: 50, + buffer: 5000, + batch_flush_interval: Duration::from_secs(100), + flush_timeout: Duration::from_secs(10), + flushers: 4, + zstd_level: 3, + batch_queue: 32, + retry_interval: Duration::from_millis(1), + retry_count: 100, } } #[tokio::test] async fn test_vector() { - let cli = make_cli(); - let client = Arc::new(TestClient(AtomicU64::new(0), AtomicU64::new(0))); - let vector = Vector::new(&cli, client.clone(), "", &Registry::new()); + let vector = Vector::new(make_opts(), client.clone(), "", &Registry::new()); for i in 0..5000 { let event = json!({ @@ -793,16 +788,16 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_vector_drain_alive() { - let mut cli = make_cli(); - cli.log_vector_buffer = 10000; - cli.log_vector_batch = 1000; - cli.log_vector_interval = Duration::from_secs(1); - cli.log_vector_flushers = 32; + let mut opts = make_opts(); + opts.buffer = 10000; + opts.batch_size = 1000; + opts.batch_flush_interval = Duration::from_secs(1); + opts.flushers = 32; let client = Arc::new(TestClientOk); - let vector = Vector::new(&cli, client, "", &Registry::new()); + let vector = Vector::new(opts.clone(), client, "", &Registry::new()); - for _ in 0..cli.log_vector_buffer { + for _ in 0..opts.buffer { let event = json!({ "env": "prod", "hostname": "da11-bnp00", @@ -851,17 +846,15 @@ mod test { assert_eq!( vector.metrics.sent_events.with_label_values(&[""]).get(), - cli.log_vector_buffer as u64, + opts.buffer as u64, ); } /// Make sure we can drain when the endpoint is down #[tokio::test] async fn test_vector_drain_dead() { - let cli = make_cli(); - let client = Arc::new(TestClientDead); - let vector = Vector::new(&cli, client, "", &Registry::new()); + let vector = Vector::new(make_opts(), client, "", &Registry::new()); for i in 0..6000 { let event = json!({ diff --git a/ic-bn-lib/src/vector/mod.rs b/ic-bn-lib/src/vector/mod.rs index d1a2365..921b98c 100644 --- a/ic-bn-lib/src/vector/mod.rs +++ b/ic-bn-lib/src/vector/mod.rs @@ -1,14 +1,59 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, time::Duration}; -use anyhow::Context; +use anyhow::{Context, anyhow}; use bytes::BytesMut; +use ic_bn_lib_common::types::vector::VectorCli; use prost::Message; +use url::Url; use vrl::value::{ObjectMap, Value}; pub mod client; #[allow(warnings, clippy::all, clippy::pedantic)] mod event; +/// Vector options +#[derive(Clone, Debug)] +pub struct VectorOptions { + pub url: Url, + pub user: Option, + pub pass: Option, + pub batch_size: usize, + pub batch_queue: usize, + pub batch_flush_interval: Duration, + pub buffer: usize, + pub flushers: usize, + pub flush_timeout: Duration, + pub retry_interval: Duration, + pub retry_count: usize, + pub zstd_level: usize, +} + +impl TryFrom<&VectorCli> for VectorOptions { + type Error = anyhow::Error; + + fn try_from(v: &VectorCli) -> Result { + let url = v + .log_vector_url + .clone() + .ok_or_else(|| anyhow!("URL is required"))?; + + Ok(Self { + url, + user: v.log_vector_user.clone(), + pass: v.log_vector_pass.clone(), + batch_size: v.log_vector_batch, + batch_queue: v.log_vector_batch_queue, + batch_flush_interval: v.log_vector_interval, + buffer: v.log_vector_buffer, + flushers: v.log_vector_flushers, + flush_timeout: v.log_vector_timeout, + retry_interval: v.log_vector_retry_interval, + retry_count: v.log_vector_retry_count, + zstd_level: v.log_vector_zstd_level, + }) + } +} + pub fn encode_map(fields: ObjectMap) -> event::ValueMap { event::ValueMap { fields: fields