From 3320a08d44a2d28bbf62dd0f8cd8f5a7184b265e Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Thu, 28 May 2026 16:03:49 +0000 Subject: [PATCH 1/9] add message id --- ic-bn-lib/src/smtp/inbound/mod.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/ic-bn-lib/src/smtp/inbound/mod.rs b/ic-bn-lib/src/smtp/inbound/mod.rs index a29c0b6..fa01fb7 100644 --- a/ic-bn-lib/src/smtp/inbound/mod.rs +++ b/ic-bn-lib/src/smtp/inbound/mod.rs @@ -218,8 +218,9 @@ impl Default for SessionState { } /// SMTP dynamic session data -#[derive(Debug, Default)] +#[derive(Debug)] pub struct SessionData { + message_id: Uuid, reverse_ip_verified: bool, ehlo_hostname: Option, mail_from: Option, @@ -227,6 +228,19 @@ pub struct SessionData { message: Vec, } +impl Default for SessionData { + fn default() -> Self { + Self { + message_id: Uuid::now_v7(), + reverse_ip_verified: false, + ehlo_hostname: None, + mail_from: None, + rcpt_to: vec![], + message: vec![], + } + } +} + /// SMTP session counters #[derive(Debug)] pub struct SessionCounters { From b507629d624feade55b92a8e9c46de3311cbee72 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Fri, 29 May 2026 10:38:02 +0000 Subject: [PATCH 2/9] Add namespace to vector, make metrics public --- ic-bn-lib/src/vector/client.rs | 181 +++++++++++++++++++++++---------- 1 file changed, 128 insertions(+), 53 deletions(-) diff --git a/ic-bn-lib/src/vector/client.rs b/ic-bn-lib/src/vector/client.rs index 1a70515..5d6e34d 100644 --- a/ic-bn-lib/src/vector/client.rs +++ b/ic-bn-lib/src/vector/client.rs @@ -13,9 +13,8 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use http::header::{AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE}; use ic_bn_lib_common::{traits::http::Client as HttpClient, types::vector::VectorCli}; use prometheus::{ - Histogram, IntCounter, IntCounterVec, IntGauge, Registry, register_histogram_with_registry, - register_int_counter_vec_with_registry, register_int_counter_with_registry, - register_int_gauge_with_registry, + HistogramVec, IntCounterVec, IntGaugeVec, Registry, register_histogram_vec_with_registry, + register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, }; use reqwest::{Method, Request, header::HeaderValue}; use serde_json::Value; @@ -34,85 +33,94 @@ pub const MB: f64 = 1024.0 * KB; const CONTENT_ENCODING_ZSTD: HeaderValue = hval!("zstd"); #[derive(Clone)] -struct Metrics { - sent: IntCounter, - sent_compressed: IntCounter, - sent_events: IntCounter, - buffer_event_size: IntGauge, - batch_size: IntGauge, - buffer_drops: IntCounter, - encoding_failures: IntCounter, - batch_buffer_size: IntGauge, - batch_flush_retries: IntCounter, +pub struct Metrics { + sent: IntCounterVec, + sent_compressed: IntCounterVec, + sent_events: IntCounterVec, + buffer_event_size: IntGaugeVec, + batch_size: IntGaugeVec, + buffer_drops: IntCounterVec, + encoding_failures: IntCounterVec, + batch_buffer_size: IntGaugeVec, + batch_flush_retries: IntCounterVec, batch_flushes: IntCounterVec, - batch_queue_duration: Histogram, - batch_encode_duration: Histogram, - batch_flush_duration: Histogram, - batch_sizes: Histogram, + batch_queue_duration: HistogramVec, + batch_encode_duration: HistogramVec, + batch_flush_duration: HistogramVec, + batch_sizes: HistogramVec, } impl Metrics { pub fn new(registry: &Registry) -> Self { Self { - sent: register_int_counter_with_registry!( + sent: register_int_counter_vec_with_registry!( format!("vector_sent"), format!("Number of bytes sent"), + &["namespace"], registry ) .unwrap(), - sent_compressed: register_int_counter_with_registry!( + sent_compressed: register_int_counter_vec_with_registry!( format!("vector_sent_compressed"), format!("Number of bytes sent (compressed)"), + &["namespace"], registry ) .unwrap(), - sent_events: register_int_counter_with_registry!( + sent_events: register_int_counter_vec_with_registry!( format!("vector_sent_events"), format!("Number of events sent"), + &["namespace"], registry ) .unwrap(), - buffer_event_size: register_int_gauge_with_registry!( + buffer_event_size: register_int_gauge_vec_with_registry!( format!("vector_event_buffer_size"), format!("Number of events in the incoming buffer"), + &["namespace"], registry ) .unwrap(), - batch_size: register_int_gauge_with_registry!( + batch_size: register_int_gauge_vec_with_registry!( format!("vector_batch_size"), format!("Current size of the events queued for the next batch"), + &["namespace"], registry ) .unwrap(), - buffer_drops: register_int_counter_with_registry!( + buffer_drops: register_int_counter_vec_with_registry!( format!("vector_buffer_drops"), format!("Number of events that were dropped due to buffer overflow"), + &["namespace"], registry ) .unwrap(), - encoding_failures: register_int_counter_with_registry!( + encoding_failures: register_int_counter_vec_with_registry!( format!("vector_encoding_failures"), format!("Number of events that were dropped due to encoding failure"), + &["namespace"], registry ) .unwrap(), - batch_buffer_size: register_int_gauge_with_registry!( + batch_buffer_size: register_int_gauge_vec_with_registry!( format!("vector_batch_buffer_size"), format!("Number of batches in the outgoing buffer"), + &["namespace"], registry ) .unwrap(), - batch_flush_retries: register_int_counter_with_registry!( + batch_flush_retries: register_int_counter_vec_with_registry!( format!("vector_batch_flush_retries"), format!("Number of batch flush retries"), + &["namespace"], registry ) .unwrap(), @@ -120,38 +128,42 @@ impl Metrics { batch_flushes: register_int_counter_vec_with_registry!( format!("vector_batch_flushes"), format!("Count of batch flushes"), - &["ok"], + &["namespace", "ok"], registry ) .unwrap(), - batch_queue_duration: register_histogram_with_registry!( + batch_queue_duration: register_histogram_vec_with_registry!( format!("vector_batch_queue_duration"), format!("Time it takes to queue the batch"), + &["namespace"], vec![0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2], registry ) .unwrap(), - batch_encode_duration: register_histogram_with_registry!( + batch_encode_duration: register_histogram_vec_with_registry!( format!("vector_batch_encode_duration"), format!("Time it takes to encode the batch"), + &["namespace"], vec![0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2], registry ) .unwrap(), - batch_flush_duration: register_histogram_with_registry!( + batch_flush_duration: register_histogram_vec_with_registry!( format!("vector_batch_flush_duration"), format!("Time it takes to flush the batch"), + &["namespace"], vec![0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2], registry ) .unwrap(), - batch_sizes: register_histogram_with_registry!( + batch_sizes: register_histogram_vec_with_registry!( format!("vector_batch_sizes"), format!("Batch sizes histogram"), + &["namespace"], vec![ 128.0 * KB, 256.0 * KB, @@ -198,6 +210,7 @@ pub fn encode_batch(batch: Vec) -> Result { } pub struct Vector { + namespace: String, token_batcher: CancellationToken, token_flushers: CancellationToken, token_flushers_drain: CancellationToken, @@ -208,14 +221,27 @@ pub struct Vector { } impl Vector { - pub fn new(cli: &VectorCli, client: Arc, registry: &Registry) -> Self { + pub fn new( + cli: &VectorCli, + client: Arc, + namespace: &str, + registry: &Registry, + ) -> Self { + let metrics = Metrics::new(registry); + Self::new_with_metrics(cli, client, namespace, metrics) + } + + pub fn new_with_metrics( + cli: &VectorCli, + client: Arc, + namespace: &str, + metrics: Metrics, + ) -> Self { let cli = cli.clone(); let (tx_event, rx_event) = mpsc::channel(cli.log_vector_buffer); let (tx_batch, rx_batch) = async_channel::bounded(cli.log_vector_batch_queue); - let metrics = Metrics::new(registry); - // Start batcher warn!("Vector: starting batcher"); let token_batcher = CancellationToken::new(); @@ -230,6 +256,7 @@ impl Vector { batch: Vec::with_capacity(cli.log_vector_batch), interval, token: token_batcher.child_token(), + namespace: namespace.to_string(), metrics: metrics.clone(), }; @@ -262,6 +289,7 @@ impl Vector { retry_interval: cli.log_vector_retry_interval, retry_count: cli.log_vector_retry_count, timeout: cli.log_vector_timeout, + namespace: namespace.to_string(), metrics: metrics.clone(), }; @@ -271,6 +299,7 @@ impl Vector { } Self { + namespace: namespace.to_string(), token_batcher, token_flushers, token_flushers_drain, @@ -284,9 +313,15 @@ impl Vector { pub fn send(&self, event: Value) { // If it fails we'll lose the event, but it's better than to block & eat memory. if self.tx.try_send(event).is_err() { - self.metrics.buffer_drops.inc(); + self.metrics + .buffer_drops + .with_label_values(&[&self.namespace]) + .inc(); } else { - self.metrics.buffer_event_size.inc(); + self.metrics + .buffer_event_size + .with_label_values(&[&self.namespace]) + .inc(); }; } @@ -316,6 +351,7 @@ struct Batcher { batch: Vec, interval: Interval, token: CancellationToken, + namespace: String, metrics: Metrics, } @@ -328,7 +364,10 @@ impl Display for Batcher { impl Batcher { async fn add_to_batch(&mut self, event: Value) { self.batch.push(event); - self.metrics.batch_size.set(self.batch.len() as i64); + self.metrics + .batch_size + .with_label_values(&[&self.namespace]) + .set(self.batch.len() as i64); // If we've reached the capacity - it's time to flush if self.batch.len() == self.batch.capacity() { @@ -355,8 +394,14 @@ impl Batcher { debug!("{self}: batch ({len} events) queued in {dur}s"); - self.metrics.batch_queue_duration.observe(dur); - self.metrics.batch_buffer_size.inc(); + self.metrics + .batch_queue_duration + .with_label_values(&[&self.namespace]) + .observe(dur); + self.metrics + .batch_buffer_size + .with_label_values(&[&self.namespace]) + .inc(); } async fn drain(&mut self) { @@ -389,7 +434,7 @@ impl Batcher { }, Some(event) = self.rx.recv() => { - self.metrics.buffer_event_size.dec(); + self.metrics.buffer_event_size.with_label_values(&[&self.namespace]).dec(); self.add_to_batch(event).await; } } @@ -409,6 +454,7 @@ struct Flusher { zstd_level: usize, token: CancellationToken, token_drain: CancellationToken, + namespace: String, metrics: Metrics, } @@ -473,7 +519,10 @@ impl Flusher { // Bytes is cheap to clone if let Err(e) = self.send_batch(batch.clone(), timeout).await { - self.metrics.batch_flushes.with_label_values(&["no"]).inc(); + self.metrics + .batch_flushes + .with_label_values(&[&self.namespace, "no"]) + .inc(); warn!( "{self}: unable to send (try {}, retry interval {}s): {e:#}", @@ -481,9 +530,18 @@ impl Flusher { interval.as_secs_f64() ); } else { - self.metrics.sent.inc_by(raw_size as u64); - self.metrics.sent_compressed.inc_by(batch.len() as u64); - self.metrics.batch_flushes.with_label_values(&["yes"]).inc(); + self.metrics + .sent + .with_label_values(&[&self.namespace]) + .inc_by(raw_size as u64); + self.metrics + .sent_compressed + .with_label_values(&[&self.namespace]) + .inc_by(batch.len() as u64); + self.metrics + .batch_flushes + .with_label_values(&[&self.namespace, "yes"]) + .inc(); debug!("{self}: batch sent in {}s", start.elapsed().as_secs_f64()); return Ok(()); @@ -493,7 +551,10 @@ impl Flusher { interval = (interval + self.retry_interval).min(self.retry_interval * 5); timeout = (timeout + self.timeout).min(self.timeout * 10); - self.metrics.batch_flush_retries.inc(); + self.metrics + .batch_flush_retries + .with_label_values(&[&self.namespace]) + .inc(); retries += 1; // Limit the retry count and reset the interval/timeout if we're draining. @@ -517,7 +578,10 @@ impl Flusher { #[allow(clippy::cognitive_complexity)] async fn process_batch(&self, batch: Batch) { let len = batch.events.len(); - self.metrics.batch_buffer_size.dec(); + self.metrics + .batch_buffer_size + .with_label_values(&[&self.namespace]) + .dec(); debug!("{self}: received batch ({len} events)"); @@ -527,25 +591,36 @@ impl Flusher { Ok(v) => { self.metrics .batch_encode_duration + .with_label_values(&[&self.namespace]) .observe(start.elapsed().as_secs_f64()); - self.metrics.batch_sizes.observe(v.len() as f64); + self.metrics + .batch_sizes + .with_label_values(&[&self.namespace]) + .observe(v.len() as f64); // Send it let start = Instant::now(); if let Err(e) = self.send_batch_retry(v).await { warn!("{self}: unable to flush: {e:#}"); } else { - self.metrics.sent_events.inc_by(len as u64); + self.metrics + .sent_events + .with_label_values(&[&self.namespace]) + .inc_by(len as u64); }; self.metrics .batch_flush_duration + .with_label_values(&[&self.namespace]) .observe(start.elapsed().as_secs_f64()); debug!("{self}: {len} events flushed"); } Err(e) => { - self.metrics.encoding_failures.inc(); + self.metrics + .encoding_failures + .with_label_values(&[&self.namespace]) + .inc(); warn!("{self}: unable to encode batch: {e:#}") } }; @@ -689,7 +764,7 @@ mod test { let cli = make_cli(); let client = Arc::new(TestClient(AtomicU64::new(0), AtomicU64::new(0))); - let vector = Vector::new(&cli, client.clone(), &Registry::new()); + let vector = Vector::new(&cli, client.clone(), "", &Registry::new()); for i in 0..5000 { let event = json!({ @@ -714,7 +789,7 @@ mod test { cli.log_vector_flushers = 32; let client = Arc::new(TestClientOk); - let vector = Vector::new(&cli, client, &Registry::new()); + let vector = Vector::new(&cli, client, "", &Registry::new()); for _ in 0..cli.log_vector_buffer { let event = json!({ @@ -764,7 +839,7 @@ mod test { vector.stop().await; assert_eq!( - vector.metrics.sent_events.get(), + vector.metrics.sent_events.with_label_values(&[""]).get(), cli.log_vector_buffer as u64, ); } @@ -775,7 +850,7 @@ mod test { let cli = make_cli(); let client = Arc::new(TestClientDead); - let vector = Vector::new(&cli, client, &Registry::new()); + let vector = Vector::new(&cli, client, "", &Registry::new()); for i in 0..6000 { let event = json!({ From 945dfd6223d148c3231ad43ee5756953a50a0776 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Fri, 29 May 2026 15:58:22 +0000 Subject: [PATCH 3/9] SMTP: add notification handler, ProtocolError etc --- ic-bn-lib/src/smtp/ic/delivery_agent.rs | 11 +- ic-bn-lib/src/smtp/inbound/ehlo.rs | 18 +- ic-bn-lib/src/smtp/inbound/mail_from.rs | 42 +- ic-bn-lib/src/smtp/inbound/manager.rs | 50 +- ic-bn-lib/src/smtp/inbound/mod.rs | 684 ++--------------------- ic-bn-lib/src/smtp/inbound/rcpt_to.rs | 17 +- ic-bn-lib/src/smtp/inbound/session.rs | 115 ++-- ic-bn-lib/src/smtp/inbound/tests.rs | 712 ++++++++++++++++++++++++ ic-bn-lib/src/smtp/mod.rs | 72 ++- 9 files changed, 1016 insertions(+), 705 deletions(-) create mode 100644 ic-bn-lib/src/smtp/inbound/tests.rs diff --git a/ic-bn-lib/src/smtp/ic/delivery_agent.rs b/ic-bn-lib/src/smtp/ic/delivery_agent.rs index 1c72251..3b83f92 100644 --- a/ic-bn-lib/src/smtp/ic/delivery_agent.rs +++ b/ic-bn-lib/src/smtp/ic/delivery_agent.rs @@ -359,9 +359,12 @@ impl ResolvesRecipient for IcSmtpDeliveryAgent { #[cfg(test)] mod tests { - use std::sync::{ - Mutex, - atomic::{AtomicUsize, Ordering}, + use std::{ + net::IpAddr, + sync::{ + Mutex, + atomic::{AtomicUsize, Ordering}, + }, }; use crate::{ @@ -607,6 +610,8 @@ mod tests { let message = EmailMessage { id: Uuid::nil(), + session_id: Uuid::nil(), + remote_ip: IpAddr::from_str("1.1.1.1").unwrap(), ehlo_hostname: fqdn!("foo.bar"), mail_from: email!("john@doe.com"), rcpt_to: vec![ diff --git a/ic-bn-lib/src/smtp/inbound/ehlo.rs b/ic-bn-lib/src/smtp/inbound/ehlo.rs index 70e2a28..4f3f222 100644 --- a/ic-bn-lib/src/smtp/inbound/ehlo.rs +++ b/ic-bn-lib/src/smtp/inbound/ehlo.rs @@ -6,7 +6,10 @@ use tracing::{debug, info}; use crate::{ http::dns::is_error_negative_lookup, network::AsyncReadWrite, - smtp::inbound::{Session, SessionResult}, + smtp::{ + ProtocolError, + inbound::{Session, SessionResult}, + }, }; impl Session { @@ -15,6 +18,9 @@ impl Session { // Validate hostname let Ok(ehlo_hostname) = FQDN::from_str(host) else { info!("{self}: {host}: Invalid EHLO hostname"); + self.set_error(ProtocolError::InvalidEhloHostname(format!( + "{host}: incorrect hostname" + ))); return self.reply("550", "5.5.0", "Invalid EHLO hostname.").await; }; @@ -28,6 +34,9 @@ impl Session { if ehlo_hostname.depth() < 2 { info!("{self}: {host}: EHLO is not FQDN"); + self.set_error(ProtocolError::InvalidEhloHostname(format!( + "{host}: not FQDN" + ))); return self .reply("550", "5.5.0", "EHLO hostname must be an FQDN.") .await; @@ -42,6 +51,9 @@ impl Session { } None => { info!("{self}: {host}: EHLO not found in DNS"); + self.set_error(ProtocolError::InvalidEhloHostname(format!( + "{host}: not found in DNS" + ))); return self .reply("550", "5.5.0", "EHLO hostname not found in DNS.") .await; @@ -52,6 +64,10 @@ impl Session { info!("{self}: {host}: EHLO not found in DNS: {e:#}"); if is_error_negative_lookup(&e) { + self.set_error(ProtocolError::InvalidEhloHostname(format!( + "{host}: not found in DNS" + ))); + return self .reply("550", "5.5.0", "EHLO hostname not found in DNS.") .await; diff --git a/ic-bn-lib/src/smtp/inbound/mail_from.rs b/ic-bn-lib/src/smtp/inbound/mail_from.rs index 2a4de91..f9b9863 100644 --- a/ic-bn-lib/src/smtp/inbound/mail_from.rs +++ b/ic-bn-lib/src/smtp/inbound/mail_from.rs @@ -13,6 +13,7 @@ use crate::{ http::dns::is_error_negative_lookup, network::AsyncReadWrite, smtp::{ + ProtocolError, address::EmailAddress, inbound::{Session, SessionResult}, }, @@ -22,12 +23,18 @@ impl Session { /// Handles MAIL FROM command pub async fn handle_mail_from(&mut self, from: MailFrom>) -> SessionResult<()> { let Some(helo_hostname) = self.data.ehlo_hostname.as_ref().map(|x| x.to_string()) else { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "MAIL FROM before EHLO".into(), + )); return self .reply("503", "5.5.1", "Polite people say EHLO first.") .await; }; if self.data.mail_from.is_some() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "Multiple MAIL FROM".into(), + )); return self .reply( "503", @@ -56,6 +63,10 @@ impl Session { } if from.size > self.cfg.max_message_size { + self.set_error(ProtocolError::MessageTooBig(format!( + "MAIL FROM-specified size is too big: {} > {}", + from.size, self.cfg.max_message_size + ))); return self.message_too_big().await; } @@ -70,6 +81,10 @@ impl Session { // Validate address let Ok(address) = EmailAddress::from_str(&from.address) else { info!("{self}: {}: incorrect sender address", from.address); + self.set_error(ProtocolError::SenderValidationFailed(format!( + "Incorrect sender address: {}", + from.address + ))); return self .reply("550", "5.7.1", "Sender address is incorrect.") .await; @@ -88,6 +103,10 @@ impl Session { if self.cfg.verify_sender_domain { if address.domain().depth() < 2 { info!("{self}: {address}: sender domain verification failed: not FQDN"); + self.set_error(ProtocolError::SenderValidationFailed(format!( + "Sender domain is not FQDN: {}", + address.domain() + ))); return self.reply("550", "5.7.2", "Sender must be an FQDN.").await; }; @@ -103,6 +122,9 @@ impl Session { info!( "{self}: {address}: sender domain verification failed: no MX records found" ); + self.set_error(ProtocolError::SenderValidationFailed( + "No MX records found".into(), + )); return self .reply( "550", @@ -117,6 +139,9 @@ impl Session { info!( "{self}: {address}: sender domain verification failed: no MX records found" ); + self.set_error(ProtocolError::SenderValidationFailed( + "No MX records found".into(), + )); return self .reply( "550", @@ -128,6 +153,9 @@ impl Session { info!( "{self}: {address}: sender domain verification failed: temporary error: {e:#}" ); + self.set_error(ProtocolError::SenderValidationFailed(format!( + "Sender domain verification temporary error: {e:#}", + ))); return self .reply("451", "4.7.25", "Temporary error validating sender domain.") .await; @@ -157,7 +185,10 @@ impl Session { "{self}: {address}: SPF validation failed: temporary error: {:?}", output.explanation() ); - + self.set_error(ProtocolError::SpfValidationFailed(format!( + "SPF validation temporary error: {:?}", + output.explanation() + ))); return self .reply("451", "4.7.24", "Temporary SPF validation error.") .await; @@ -167,7 +198,10 @@ impl Session { "{self}: {address}: SPF validation failed: permanent error: {:?}", output.explanation() ); - + self.set_error(ProtocolError::SpfValidationFailed(format!( + "SPF validation permanent error: {:?}", + output.explanation() + ))); return self .reply_with("550", "5.7.23", |buf| { write!(buf, "SPF validation failed")?; @@ -191,6 +225,8 @@ impl Session { /// Replies about failed reverse IP verification async fn verify_reverse_ip_reply(&mut self, permanent: bool, msg: &str) -> SessionResult { + self.set_error(ProtocolError::ReverseIpValidationFailed(msg.into())); + // Emit permanent errors only if in strict mode if permanent && self.cfg.verify_reverse_ip_strict { self.reply_with("550", "5.7.25", |buf| { @@ -309,7 +345,7 @@ impl Session { .await; } - // Otherwise everything succeeded but no matches found + // Otherwise everything succeeded but no matches were found info!( "{self}: reverse IP verification failed: no addresses matching client's IP found after resolving PTR" ); diff --git a/ic-bn-lib/src/smtp/inbound/manager.rs b/ic-bn-lib/src/smtp/inbound/manager.rs index 55124eb..2b231de 100644 --- a/ic-bn-lib/src/smtp/inbound/manager.rs +++ b/ic-bn-lib/src/smtp/inbound/manager.rs @@ -35,9 +35,8 @@ impl SessionManager { SessionUpgrade::No => { session.stream.shutdown().await.ok(); } - SessionUpgrade::StartTls => { - Self::starttls(session, shutdown_token.child_token()).await + Self::handle_connection_tls(session, shutdown_token.child_token()).await } }, @@ -49,15 +48,20 @@ impl SessionManager { if let Err(e) = session.shutdown().await { debug!("{session}: error closing connection: {e:#}"); }; + + Self::notify(session, e).await; } } } /// Converts session into TLS mode & runs it - async fn starttls(session: Session, shutdown_token: CancellationToken) { + async fn handle_connection_tls( + session: Session, + shutdown_token: CancellationToken, + ) { let session_name = session.to_string(); + debug!("{session}: starting TLS handshake"); - debug!("{session_name}: starting TLS handshake"); match session.into_tls().await { Ok(mut session) => { debug!("{session}: TLS handshake succeeded"); @@ -70,6 +74,8 @@ impl SessionManager { if let Err(e) = session.shutdown().await { debug!("{session}: error closing connection: {e:#}"); }; + + Self::notify(session, e).await; } } @@ -78,6 +84,20 @@ impl SessionManager { } }; } + + async fn notify(session: Session, error: SessionError) { + if let Some(v) = &session.cfg.notifications_handler { + v.notify_session_finish( + session.id, + session.remote_ip, + session.data, + session.counters, + session.tls_info, + error, + ) + .await; + } + } } impl Session { @@ -92,7 +112,27 @@ impl Session { } }; - let (stream, tls_info) = tls_handshake(tls_config, self.stream).await?; + let (stream, tls_info) = match tls_handshake(tls_config, self.stream).await { + Ok(v) => v, + Err(e) => { + let error_str = e.to_string(); + + // Session is partially consumed by `tls_handshake`, so we can't use `Manager::notify()` + if let Some(v) = &self.cfg.notifications_handler { + v.notify_session_finish( + self.id, + self.remote_ip, + self.data, + self.counters, + self.tls_info, + SessionError::TlsHandshakeFailed(error_str.clone()), + ) + .await; + } + + return Err(SessionError::TlsHandshakeFailed(error_str)); + } + }; Ok(Session { id: self.id, diff --git a/ic-bn-lib/src/smtp/inbound/mod.rs b/ic-bn-lib/src/smtp/inbound/mod.rs index fa01fb7..d6d5ac1 100644 --- a/ic-bn-lib/src/smtp/inbound/mod.rs +++ b/ic-bn-lib/src/smtp/inbound/mod.rs @@ -25,20 +25,24 @@ use smtp_proto::{ BdatReceiver, DataReceiver, DummyDataReceiver, DummyLineReceiver, RequestReceiver, }, }; -use strum::Display; +use strum::{Display, IntoStaticStr}; use uuid::Uuid; use crate::{ network::AsyncReadWrite, smtp::{ - DeliversMail, DummyDeliveryAgent, DummyRecipientResolver, ResolvesRecipient, - address::EmailAddress, + DeliversMail, DummyDeliveryAgent, DummyRecipientResolver, ProtocolError, + ReceivesNotifications, ResolvesRecipient, address::EmailAddress, }, }; pub(crate) const MAX_REPLY_LEN: usize = 256; -#[derive(thiserror::Error, Debug)] +/// Error that leads to session termination. +/// The only "expected" error is `Quit` that is caused by the client QUIT +/// command. +#[derive(thiserror::Error, Debug, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] pub enum SessionError { #[error("I/O error: {0}")] Io(#[from] io::Error), @@ -48,6 +52,8 @@ pub enum SessionError { Dns(#[from] NetError), #[error("Timed out")] Timeout, + #[error("TLS handshake failed: {0}")] + TlsHandshakeFailed(String), #[error("{0}")] SmtpError(#[from] SmtpError), #[error("Session terminated by client (QUIT)")] @@ -125,6 +131,7 @@ pub struct SessionConfig { pub authenticator: Arc, pub recipient_resolver: Arc, pub delivery_agent: Arc, + pub notifications_handler: Option>, } impl SessionConfig { @@ -165,6 +172,7 @@ impl SessionConfig { authenticator: Arc::new(MessageAuthenticator::new_cloudflare().unwrap()), recipient_resolver: Arc::new(DummyRecipientResolver), delivery_agent: Arc::new(DummyDeliveryAgent), + notifications_handler: None, } } @@ -218,9 +226,10 @@ impl Default for SessionState { } /// SMTP dynamic session data -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct SessionData { message_id: Uuid, + last_error: Option, reverse_ip_verified: bool, ehlo_hostname: Option, mail_from: Option, @@ -231,7 +240,11 @@ pub struct SessionData { impl Default for SessionData { fn default() -> Self { Self { + #[cfg(not(test))] message_id: Uuid::now_v7(), + #[cfg(test)] + message_id: Uuid::nil(), + last_error: None, reverse_ip_verified: false, ehlo_hostname: None, mail_from: None, @@ -242,12 +255,12 @@ impl Default for SessionData { } /// SMTP session counters -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct SessionCounters { - valid_until: Instant, - bytes_ingested: usize, - messages_queued: usize, - errors: usize, + pub valid_until: Instant, + pub bytes_ingested: usize, + pub messages_queued: usize, + pub errors: usize, } impl SessionCounters { @@ -263,14 +276,14 @@ impl SessionCounters { /// SMTP Session pub struct Session { - id: Uuid, - remote_ip: IpAddr, + pub id: Uuid, + pub remote_ip: IpAddr, stream: S, state: SessionState, - data: SessionData, - counters: SessionCounters, + pub data: SessionData, + pub counters: SessionCounters, cfg: Arc, - tls_info: Option, + pub tls_info: Option, } impl Display for Session { @@ -287,7 +300,10 @@ impl Display for Session { impl Session { pub fn new(remote_ip: IpAddr, stream: S, cfg: Arc) -> Self { Self { + #[cfg(not(test))] id: Uuid::now_v7(), + #[cfg(test)] + id: Uuid::nil(), remote_ip, stream, state: SessionState::Greeting, @@ -297,638 +313,12 @@ impl Session { tls_info: None, } } -} - -#[cfg(test)] -mod tests { - use std::{net::SocketAddr, str::FromStr, sync::Mutex}; - - use async_trait::async_trait; - use fqdn::fqdn; - use ic_bn_lib_common::types::http::ListenerOpts; - use mail_parser::{Addr, Address, MessageParser}; - use mail_send::{SmtpClientBuilder, mail_builder::MessageBuilder}; - use rustls::{ClientConfig, pki_types::ServerName}; - use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; - use tokio_rustls::TlsConnector; - use tokio_util::sync::CancellationToken; - - use crate::{ - email, - network::listener::listen_tcp, - smtp::{ - DeliveryError, EmailMessage, RecipientPolicy, RecipientResolveError, - inbound::manager::SessionManager, server::Server, - }, - tests::{TEST_CERT_1, TEST_KEY_1}, - tls::{resolver::StubResolver, verify::NoopServerCertVerifier}, - }; - - use super::*; - - #[derive(Debug, Default)] - pub struct TestDeliveryAgent(Mutex>, Option); - - #[async_trait] - impl DeliversMail for TestDeliveryAgent { - async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError> { - if let Some(e) = &self.1 { - return Err(e.clone()); - } - - *self.0.lock().unwrap() = Some(message); - Ok(()) - } - } - - #[derive(Debug)] - pub struct TestRecipientResolver( - EmailAddress, - Option, - Option>, - ); - - #[async_trait] - impl ResolvesRecipient for TestRecipientResolver { - async fn resolve_recipient( - &self, - _from: &EmailAddress, - rcpt: &EmailAddress, - ) -> Result { - assert_eq!(rcpt, &self.0); - if let Some(v) = &self.1 { - return Ok(RecipientPolicy::Rewrite(v.clone())); - } - - if let Some(v) = &self.2 { - return Ok(RecipientPolicy::Expand(v.clone())); - } - - Ok(RecipientPolicy::Accept) - } - } - - fn create_session( - stream: S, - greeting_delay: Option, - ) -> Session { - let mut cfg = SessionConfig::new("test", 512); - cfg.max_errors = 5; - cfg.greeting_delay = greeting_delay; - cfg.max_messages_per_session = 3; - cfg.max_session_data = 8192; - cfg.max_recipients = 3; - - Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)) - } - - fn create_basic_stream() -> tokio_test::io::Builder { - let mut builder = tokio_test::io::Builder::new(); - - builder.write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(b"HELO foo.bar\r\n") - .write(b"250 test you had me at HELO\r\n") - .read(b"EHLO foo.bar\r\n") - .write(b"250-test you had me at EHLO\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); - - builder - } - - fn stream_send_message(b: &mut tokio_test::io::Builder) { - b.read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"DATA\r\n") - .write(b"354 Start mail input; end with .\r\n") - .read(b"foobarmessage\r\n.\r\n") - .write(b"250 2.0.0 Message (13 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n"); - } - - #[tokio::test] - async fn test_ehlo_required() { - let stream = tokio_test::io::Builder::new() - .write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(b"MAIL FROM:\r\n") - .write(b"503 5.5.1 Polite people say EHLO first.\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_pipelining() { - let mut builder = create_basic_stream(); - let stream = builder - .read(b"MAIL FROM:\r\nRCPT TO:\r\nDATA\r\n") - .write(b"250 2.1.0 OK\r\n250 2.1.5 OK\r\n354 Start mail input; end with .\r\n") - .read(b"foobarmessage\r\n.\r\n") - .write(b"250 2.0.0 Message (13 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_basic_session() { - let mut builder = create_basic_stream(); - builder - .read(b"RCPT TO:\r\n") - .write(b"503 5.5.1 MAIL FROM is required first.\r\n") - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"MAIL FROM:\r\n") - .write(b"503 5.5.1 Multiple MAIL FROM commands are not allowed.\r\n") - .read(b"DATA\r\n") - .write(b"503 5.5.1 RCPT TO is required first.\r\n") - .read(b"RSET\r\n") - .write(b"250 2.0.0 OK\r\n") - .read(b"NOOP\r\n") - .write(b"250 2.0.0 OK\r\n") - .read(b"FOOB\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"HELP\r\n") - .write(b"502 5.5.1 Command not implemented.\r\n"); - - stream_send_message(&mut builder); - let stream = builder - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_client_sends_before_greeting() { - let stream = tokio_test::io::Builder::new() - .read(b"EHLO foo.bar\r\n") - .write(b"501 5.7.1 Client sent command before greeting banner.\r\n") - .build(); - - let mut session = create_session(stream, Some(Duration::from_millis(100))); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::SendsBeforeGreeting - )); - } - - #[tokio::test] - async fn test_bdat() { - let stream = create_basic_stream() - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"BDAT 10\r\n") - .read(b"01234") - .read(b"56789") - .write(b"250 2.6.0 Chunk accepted.\r\n") - .read(b"BDAT 10\r\n") - .read(b"987") - .read(b"654") - .read(b"3210") - .write(b"250 2.6.0 Chunk accepted.\r\n") - .read(b"BDAT 10 LAST\r\n") - .read(b"0123456789") - .write(b"250 2.0.0 Message (30 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let agent = Arc::new(TestDeliveryAgent::default()); - let resolver = TestRecipientResolver( - "dead@beef".try_into().unwrap(), - Some("bar@baz".try_into().unwrap()), - None, - ); - - let mut cfg = SessionConfig::new("test", 512); - cfg.delivery_agent = agent.clone(); - cfg.recipient_resolver = Arc::new(resolver); - - let mut session = Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - - // Make sure the agent gets the correct mail - assert_eq!( - agent.0.lock().unwrap().clone(), - Some(EmailMessage { - id: Uuid::nil(), - ehlo_hostname: fqdn!("foo.bar"), - mail_from: "foo@bar".try_into().unwrap(), - rcpt_to: vec!["bar@baz".try_into().unwrap()], - body: "012345678998765432100123456789".into(), - }) - ); - } - - #[tokio::test] - async fn test_data() { - let mut builder = create_basic_stream(); - stream_send_message(&mut builder); - - let stream = builder - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let agent = Arc::new(TestDeliveryAgent::default()); - let resolver = TestRecipientResolver( - "dead@beef".try_into().unwrap(), - Some("bar@baz".try_into().unwrap()), - None, - ); - - let mut cfg = SessionConfig::new("test", 512); - cfg.delivery_agent = agent.clone(); - cfg.recipient_resolver = Arc::new(resolver); - - let mut session = Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - - // Make sure the agent gets the correct mail - assert_eq!( - agent.0.lock().unwrap().clone(), - Some(EmailMessage { - id: Uuid::nil(), - ehlo_hostname: fqdn!("foo.bar"), - mail_from: email!("foo@bar"), - rcpt_to: vec![email!("bar@baz")], - body: "foobarmessage".into(), - }) - ) - } - - #[tokio::test] - async fn test_expand() { - let mut builder = create_basic_stream(); - stream_send_message(&mut builder); - - let stream = builder - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let agent = Arc::new(TestDeliveryAgent::default()); - let resolver = TestRecipientResolver( - "dead@beef".try_into().unwrap(), - None, - Some(vec![ - "dead@dead".try_into().unwrap(), - "bar@bax".try_into().unwrap(), - ]), - ); - - let mut cfg = SessionConfig::new("test", 512); - cfg.delivery_agent = agent.clone(); - cfg.recipient_resolver = Arc::new(resolver); - - let mut session = Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - - // Make sure the agent gets the correct mail - assert_eq!( - agent.0.lock().unwrap().clone(), - Some(EmailMessage { - id: Uuid::nil(), - ehlo_hostname: fqdn!("foo.bar"), - mail_from: email!("foo@bar"), - rcpt_to: vec![email!("dead@beef"), email!("dead@dead"), email!("bar@bax"),], - body: "foobarmessage".into(), - }) - ) - } - - #[tokio::test] - async fn test_max_recipients() { - let stream = create_basic_stream() - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"455 4.5.3 Too many recipients.\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - #[tokio::test] - async fn test_max_message_size() { - let stream = create_basic_stream() - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"DATA\r\n") - .write(b"354 Start mail input; end with .\r\n") - .read(format!("{}\r\n.\r\n", "1".repeat(513)).as_bytes()) - .write(b"552 5.3.4 Message too big, we accept up to 512 bytes.\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_max_messages_per_session() { - let mut builder = create_basic_stream(); - stream_send_message(&mut builder); - stream_send_message(&mut builder); - stream_send_message(&mut builder); - - let stream = builder - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"DATA\r\n") - .write(b"452 4.4.5 Maximum number of messages per session exceeded.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::TooManyMessagesPerSession - )); - } - - #[tokio::test] - async fn test_max_errors() { - let stream = tokio_test::io::Builder::new() - .write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"452 4.3.2 Too many errors.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::TooManyErrors - )); - } - - #[tokio::test] - async fn test_request_too_large() { - let stream = tokio_test::io::Builder::new() - .write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(format!("EHLO {}", "1".repeat(2048)).as_bytes()) - .read(format!("{}\r\n", "1".repeat(2048)).as_bytes()) - .write(b"554 5.3.4 Line is too long.\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_max_session_transfer_quota() { - let stream = tokio_test::io::Builder::new() - .write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(format!("EHLO {}\r\n", "1".repeat(8192)).as_bytes()) - .write(b"452 4.7.28 Session transfer quota exceeded.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::TransferQuotaExceeded(_) - )); - } - - #[tokio::test] - async fn test_starttls() { - rustls::crypto::ring::default_provider() - .install_default() - .ok(); - - // Use an in-memory pipe - let (stream1, mut stream2) = duplex(8192); - - let rustls_server_cfg = ServerConfig::builder() - .with_no_client_auth() - .with_cert_resolver(Arc::new( - StubResolver::new(TEST_CERT_1.as_bytes(), TEST_KEY_1.as_bytes()).unwrap(), - )); - - let mut cfg = SessionConfig::new("test", 512); - cfg.tls_mode = SessionTlsMode::Required(Arc::new(rustls_server_cfg)); - - tokio::spawn(async move { - SessionManager::handle_connection( - stream1, - SocketAddr::from_str("1.1.1.1:123").unwrap(), - Arc::new(cfg), - CancellationToken::new(), - ) - .await; - }); - - let mut buf = vec![0; 8192]; - - let r = stream2.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"220 test ESMTP IC SMTP Gateway\r\n"); - - // Make sure EHLO advertises 250-STARTTLS - stream2.write_all(b"EHLO foo.bar\r\n").await.unwrap(); - let r = stream2.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"250-test you had me at EHLO\r\n250-STARTTLS\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); - - // Make sure TLS is required by the server due to SessionTlsMode::Required - stream2.write_all(b"MAIL FROM:\r\n").await.unwrap(); - let r = stream2.read(&mut buf).await.unwrap(); - assert_eq!( - &buf[..r], - b"503 5.5.1 TLS is required to submit mail on this server.\r\n" - ); - - // Fire up TLS handshake - stream2.write_all(b"STARTTLS\r\n").await.unwrap(); - let r = stream2.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"220 2.0.0 Ready to start TLS.\r\n"); - - let rustls_client_cfg = ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(Arc::new(NoopServerCertVerifier::default())) - .with_no_client_auth(); - let tls_connector = TlsConnector::from(Arc::new(rustls_client_cfg)); - let mut tls_stream = tls_connector - .connect(ServerName::try_from("foo").unwrap(), stream2) - .await - .unwrap(); - - // Make sure there's no 250-STARTTLS and 250-REQUIRETLS in EHLO anymore inside TLS session - tls_stream.write_all(b"EHLO foo.bar\r\n").await.unwrap(); - let r = tls_stream.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"250-test you had me at EHLO\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); - - // No TLS-in-TLS allowed - tls_stream.write_all(b"STARTTLS\r\n").await.unwrap(); - let r = tls_stream.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"504 5.7.4 Already in TLS mode.\r\n"); - - // Now MAIL FROM should work - tls_stream.write_all(b"MAIL FROM:\r\n").await.unwrap(); - let r = tls_stream.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"250 2.1.0 OK\r\n"); - - // All good - tls_stream.write_all(b"QUIT\r\n").await.unwrap(); - let r = tls_stream.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"221 2.0.0 Bye.\r\n"); - } - - #[tokio::test] - async fn test_with_smtp_client() { - rustls::crypto::ring::default_provider() - .install_default() - .ok(); - - let rustls_server_cfg = ServerConfig::builder() - .with_no_client_auth() - .with_cert_resolver(Arc::new( - StubResolver::new(TEST_CERT_1.as_bytes(), TEST_KEY_1.as_bytes()).unwrap(), - )); - - let agent = Arc::new(TestDeliveryAgent::default()); - - // Listen on the random free port - let listener = listen_tcp("127.0.0.1:0".parse().unwrap(), ListenerOpts::default()).unwrap(); - let port = listener.local_addr().unwrap().port(); - - let mut cfg = SessionConfig::new("test", 10 * 1024 * 1024); - cfg.delivery_agent = agent.clone(); - cfg.tls_mode = SessionTlsMode::Allowed(Arc::new(rustls_server_cfg)); - - let server = Server::new_with_listener(listener, cfg).unwrap(); - - tokio::spawn(async move { - server.serve(CancellationToken::new()).await.unwrap(); - }); - - let message = MessageBuilder::new() - .from(("John Doe", "john@doe.com")) - .to(("Jane Doe", "jane@doe.com")) - .subject("Hello") - .text_body("Blah"); - - let mut client = SmtpClientBuilder::new("127.0.0.1", port) - .unwrap() - .implicit_tls(false) - .helo_host("foo.bar") - .allow_invalid_certs() - .connect() - .await - .unwrap(); - - // Try some commands - client.noop().await.unwrap(); - client.rset().await.unwrap(); - - // Make sure we have the required caps - let caps = client.capabilities("foo.bar", false).await.unwrap(); - assert_eq!( - caps.capabilities, - EXT_SMTP_UTF8 - | EXT_8BIT_MIME - | EXT_CHUNKING - | EXT_ENHANCED_STATUS_CODES - | EXT_SIZE - | EXT_PIPELINING - ); - - client.send(message).await.unwrap(); - - // Make sure the agent gets the correct mail - let msg = agent.0.lock().unwrap().clone().unwrap(); - assert_eq!(msg.id, Uuid::nil()); - assert_eq!(msg.ehlo_hostname, "foo.bar"); - assert_eq!(msg.mail_from, "john@doe.com"); - assert_eq!(msg.rcpt_to, vec!["jane@doe.com"]); - - let parsed = MessageParser::new().parse(&msg.body).unwrap(); - assert_eq!(parsed.subject(), Some("Hello")); - assert_eq!( - *parsed.from().unwrap(), - Address::List(vec![Addr::new(Some("John Doe"), "john@doe.com")]) - ); - assert_eq!( - *parsed.to().unwrap(), - Address::List(vec![Addr::new(Some("Jane Doe"), "jane@doe.com")]) - ); - assert_eq!(parsed.body_text(0).unwrap(), "Blah"); + fn set_error(&mut self, error: ProtocolError) { + self.data.last_error = Some(error); + self.counters.errors += 1; } } + +#[cfg(test)] +mod tests; diff --git a/ic-bn-lib/src/smtp/inbound/rcpt_to.rs b/ic-bn-lib/src/smtp/inbound/rcpt_to.rs index 72c49d8..736d17d 100644 --- a/ic-bn-lib/src/smtp/inbound/rcpt_to.rs +++ b/ic-bn-lib/src/smtp/inbound/rcpt_to.rs @@ -8,7 +8,7 @@ use tracing::{debug, info}; use crate::{ network::AsyncReadWrite, smtp::{ - RecipientPolicy, RecipientResolveError, + ProtocolError, RecipientPolicy, RecipientResolveError, address::EmailAddress, inbound::{MAX_REPLY_LEN, Session, SessionResult}, }, @@ -19,6 +19,9 @@ impl Session { /// Handles RCPT TO command pub async fn handle_rcpt_to(&mut self, to: RcptTo>) -> SessionResult<()> { let Some(mail_from) = &self.data.mail_from else { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "RCPT TO before MAIL FROM".into(), + )); return self .reply("503", "5.5.1", "MAIL FROM is required first.") .await; @@ -35,6 +38,10 @@ impl Session { let Ok(address) = EmailAddress::from_str(&to.address) else { info!("{self}: {}: incorrect address", to.address); + self.set_error(ProtocolError::RecipientValidationFailed(format!( + "Incorrect address: {}", + to.address + ))); return self.reply("550", "5.1.2", "Incorrect address.").await; }; @@ -44,6 +51,11 @@ impl Session { if self.data.rcpt_to.len() >= self.cfg.max_recipients { info!("{self}: {}: too many recipients", to.address); + self.set_error(ProtocolError::RecipientValidationFailed(format!( + "Too many recipients: {} > {}", + self.data.rcpt_to.len(), + self.cfg.max_recipients + ))); return self.reply("455", "4.5.3", "Too many recipients.").await; } @@ -72,6 +84,9 @@ impl Session { Err(e) => { info!("{self}: {}: recipient resolution error: {e:#}", to.address); + self.set_error(ProtocolError::RecipientValidationFailed(format!( + "Recipient resolution failed: {e:#}", + ))); return match e { RecipientResolveError::UnknownDomain => { diff --git a/ic-bn-lib/src/smtp/inbound/session.rs b/ic-bn-lib/src/smtp/inbound/session.rs index 113f901..d97e7d6 100644 --- a/ic-bn-lib/src/smtp/inbound/session.rs +++ b/ic-bn-lib/src/smtp/inbound/session.rs @@ -23,7 +23,7 @@ use uuid::Uuid; use crate::{ network::AsyncReadWrite, smtp::{ - DeliveryError, EmailMessage, + DeliveryError, EmailMessage, MessageError, ProtocolError, inbound::{ MAX_REPLY_LEN, Session, SessionError, SessionResult, SessionState, SessionUpgrade, }, @@ -90,6 +90,11 @@ impl Session { pub(crate) async fn message_too_big(&mut self) -> SessionResult<()> { let max_size = self.cfg.max_message_size; + self.set_error(ProtocolError::MessageTooBig(format!( + "{} > {}", + self.data.message.len(), + max_size + ))); self.reply_with("552", "5.3.4", |buf| { write!(buf, "Message too big, we accept up to {max_size} bytes.",) }) @@ -125,7 +130,7 @@ impl Session { } async fn handle_error(&mut self, error: SmtpError) -> SessionResult<()> { - self.counters.errors += 1; + self.set_error(ProtocolError::SmtpError(error.to_string())); let (code, ext, msg) = match error { SmtpError::UnknownCommand | SmtpError::InvalidResponse { .. } => { @@ -196,9 +201,9 @@ impl Session { self.reply("250", "2.0.0", "OK").await?; } _ => { + self.set_error(ProtocolError::SmtpError("Command not implemented".into())); self.reply("502", "5.5.1", "Command not implemented.") .await?; - self.counters.errors += 1; } } @@ -284,11 +289,15 @@ impl Session { Request::StartTls => { debug!("{self}: <- STARTTLS"); if self.tls_info.is_some() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "STARTTLS inside STARTTLS".into(), + )); self.reply("504", "5.7.4", "Already in TLS mode.").await?; - self.counters.errors += 1; } else if !self.cfg.tls_mode.enabled() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "STARTTLS without TLS enabled".into(), + )); self.reply("502", "5.7.0", "TLS not available.").await?; - self.counters.errors += 1; } else { self.reply("220", "2.0.0", "Ready to start TLS.").await?; self.state = state; @@ -347,9 +356,9 @@ impl Session { SessionState::RequestTooLarge(rx) => { // If line-feed found - issue error, otherwise keep ingesting if rx.ingest(&mut iter) { + self.set_error(ProtocolError::SmtpError("Line is too long".into())); self.reply("554", "5.3.4", "Line is too long.").await?; state = SessionState::default(); - self.counters.errors += 1; } else { // No line-feed found yet break; @@ -360,7 +369,6 @@ impl Session { if rx.ingest(&mut iter) { self.message_too_big().await?; state = SessionState::default(); - self.counters.errors += 1; } else { // No end-of-message marker found yet break; @@ -422,10 +430,10 @@ impl Session { async fn verify_message( &mut self, #[allow(unused_variables)] msg: &EmailMessage, - ) -> SessionResult { + ) -> SessionResult> { #[cfg(test)] { - return Ok(true); + return Ok(None); } let Some(auth_message) = AuthenticatedMessage::parse(&msg.body) else { @@ -435,7 +443,7 @@ impl Session { ); self.reply("550", "5.7.7", "Failed to parse the message") .await?; - return Ok(false); + return Ok(Some(MessageError::ParsingFailed)); }; if auth_message.received_headers_count() > self.cfg.max_received_headers { @@ -449,7 +457,7 @@ impl Session { "Too many 'Received' headers. Possible loop detected.", ) .await?; - return Ok(false); + return Ok(Some(MessageError::TooManyReceivedHeaders)); } if self.cfg.verify_dkim { @@ -459,14 +467,14 @@ impl Session { let log_name = self.to_string(); // Replies with either a temporary or a permanent error code - let mut reply = async || -> SessionResult<()> { + let mut reply = async |error: &str| -> SessionResult<()> { if outputs .iter() .any(|x| matches!(x.result(), DkimResult::TempError(_))) { // If any of the signatures that failed with a temporary error - return temporary SMTP error code info!( - "{log_name}: {}: {} -> {:?}: DKIM verification temporary failure", + "{log_name}: {}: {} -> {:?}: DKIM verification temporary failure: {error}", msg.ehlo_hostname, msg.mail_from, msg.rcpt_to ); self.reply("451", "4.7.20", "DKIM validation temporary failure.") @@ -474,7 +482,7 @@ impl Session { } else { // Otherwise permanent info!( - "{log_name}: {}: {} -> {:?}: DKIM verification failure", + "{log_name}: {}: {} -> {:?}: DKIM verification failure: {error}", msg.ehlo_hostname, msg.mail_from, msg.rcpt_to ); self.reply("550", "5.7.20", "DKIM validation failed.").await @@ -486,17 +494,26 @@ impl Session { .filter(|x| matches!(x.result(), DkimResult::Pass | DkimResult::None)) .count(); - if strict { - // Check that *all* signatures have passed validation (or there are none) + // Get first validation error (if any) + let first_error = outputs + .iter() + .filter_map(|x| match x.result() { + DkimResult::Fail(e) | DkimResult::PermError(e) | DkimResult::TempError(e) => { + Some(e.to_string()) + } + _ => None, + }) + .next(); + + // Strict: Check that *all* signatures have passed validation (or there are none) + // else: Check if *any* of the signatures have passed validation (or there are none) + if (strict && outputs.len() != signatures_passed) + || (!strict && !outputs.is_empty() && signatures_passed == 0) + { if outputs.len() != signatures_passed { - reply().await?; - return Ok(false); - } - } else { - // Check if *any* of the signatures have passed validation (or there are none) - if !outputs.is_empty() && signatures_passed == 0 { - reply().await?; - return Ok(false); + let error = first_error.unwrap_or_default(); + reply(&error).await?; + return Ok(Some(MessageError::DkimValidationFailed(error))); } } @@ -508,32 +525,22 @@ impl Session { msg.rcpt_to, outputs.len() ); - } else { - debug!( - "{log_name}: {}: {} -> {:?}: DKIM validation succeeded ({signatures_passed}/{} signatures passed)", - msg.ehlo_hostname, - msg.mail_from, - msg.rcpt_to, - outputs.len() - ); } } - Ok(true) + Ok(None) } async fn queue_message(&mut self) -> SessionResult<()> { - #[cfg(not(test))] - let id = Uuid::now_v7(); - #[cfg(test)] - let id = Uuid::nil(); - let message_size = self.data.message.len(); + let id = self.data.message_id; // SAFETY: Code makes sure these are all Some(). // It's better to panic in tests if they are not. let msg = EmailMessage { id, + session_id: self.id, + remote_ip: self.remote_ip, ehlo_hostname: self.data.ehlo_hostname.clone().unwrap(), mail_from: self.data.mail_from.take().unwrap(), rcpt_to: std::mem::take(&mut self.data.rcpt_to), @@ -541,18 +548,29 @@ impl Session { }; // Run configured verification steps on the message body - if !self.verify_message(&msg).await? { + if let Some(e) = self.verify_message(&msg).await? { + if let Some(v) = &self.cfg.notifications_handler { + v.notify_about_message(msg.clone(), Some(e)).await; + } + self.reset_message(); return Ok(()); } + // Deliver the message. + // Message cloning is rather lightweight (body is Bytes) if let Err(e) = self.cfg.delivery_agent.deliver_mail(msg.clone()).await { info!( "{self}: {}: {} -> {:?}: message delivery failed: {e:#}", msg.ehlo_hostname, msg.mail_from, msg.rcpt_to ); - self.reset_message(); + if let Some(v) = &self.cfg.notifications_handler { + v.notify_about_message(msg.clone(), Some(MessageError::DeliveryFailed(e.clone()))) + .await; + } + + self.reset_message(); return match e { DeliveryError::Permanent(v) => { self.reply_with("550", "5.5.0", |buf| { @@ -569,6 +587,10 @@ impl Session { }; } + if let Some(v) = &self.cfg.notifications_handler { + v.notify_about_message(msg.clone(), None).await; + } + info!( "{self}: {}: {} -> {:?}: message ({message_size} bytes) queued with id {id}", msg.ehlo_hostname, msg.mail_from, msg.rcpt_to @@ -593,9 +615,11 @@ impl Session { .await?; return Err(SessionError::TooManyMessagesPerSession); } else if self.data.rcpt_to.is_empty() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "DATA before RCPT TO".into(), + )); self.reply("503", "5.5.1", "RCPT TO is required first.") .await?; - self.counters.errors += 1; return Ok(false); } @@ -604,6 +628,15 @@ impl Session { /// Resets the message-related fields to their initial state pub(crate) fn reset_message(&mut self) { + #[cfg(not(test))] + { + self.data.message_id = Uuid::now_v7(); + } + #[cfg(test)] + { + self.data.message_id = Uuid::nil(); + } + self.data.mail_from = None; self.data.rcpt_to.clear(); self.data.message.clear(); diff --git a/ic-bn-lib/src/smtp/inbound/tests.rs b/ic-bn-lib/src/smtp/inbound/tests.rs new file mode 100644 index 0000000..f5464ef --- /dev/null +++ b/ic-bn-lib/src/smtp/inbound/tests.rs @@ -0,0 +1,712 @@ +use std::{net::SocketAddr, str::FromStr, sync::Mutex}; + +use async_trait::async_trait; +use fqdn::fqdn; +use ic_bn_lib_common::types::http::ListenerOpts; +use mail_parser::{Addr, Address, MessageParser}; +use mail_send::{SmtpClientBuilder, mail_builder::MessageBuilder}; +use rustls::{ClientConfig, ProtocolVersion, pki_types::ServerName}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; +use tokio_rustls::TlsConnector; +use tokio_util::sync::CancellationToken; + +use crate::{ + email, + network::listener::listen_tcp, + smtp::{ + DeliveryError, EmailMessage, MessageError, RecipientPolicy, RecipientResolveError, + inbound::manager::SessionManager, server::Server, + }, + tests::{TEST_CERT_1, TEST_KEY_1}, + tls::{resolver::StubResolver, verify::NoopServerCertVerifier}, +}; + +use super::*; + +#[derive(Debug, Default)] +pub struct TestDeliveryAgent(Mutex>, Option); + +#[async_trait] +impl DeliversMail for TestDeliveryAgent { + async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError> { + if let Some(e) = &self.1 { + return Err(e.clone()); + } + + *self.0.lock().unwrap() = Some(message); + Ok(()) + } +} + +#[allow(clippy::type_complexity)] +#[derive(Debug, Default)] +pub struct TestNotificationsReceiver { + msg: Mutex)>>, + sess: Mutex< + Option<( + Uuid, + IpAddr, + SessionData, + SessionCounters, + Option, + SessionError, + )>, + >, +} + +#[async_trait] +impl ReceivesNotifications for TestNotificationsReceiver { + async fn notify_about_message(&self, message: EmailMessage, error: Option) { + *self.msg.lock().unwrap() = Some((message, error)); + } + + async fn notify_session_finish( + &self, + id: Uuid, + remote_ip: IpAddr, + data: SessionData, + counters: SessionCounters, + tls_info: Option, + error: SessionError, + ) { + *self.sess.lock().unwrap() = Some((id, remote_ip, data, counters, tls_info, error)); + } +} + +#[derive(Debug)] +pub struct TestRecipientResolver( + EmailAddress, + Option, + Option>, +); + +#[async_trait] +impl ResolvesRecipient for TestRecipientResolver { + async fn resolve_recipient( + &self, + _from: &EmailAddress, + rcpt: &EmailAddress, + ) -> Result { + assert_eq!(rcpt, &self.0); + if let Some(v) = &self.1 { + return Ok(RecipientPolicy::Rewrite(v.clone())); + } + + if let Some(v) = &self.2 { + return Ok(RecipientPolicy::Expand(v.clone())); + } + + Ok(RecipientPolicy::Accept) + } +} + +fn create_session(stream: S, greeting_delay: Option) -> Session { + let mut cfg = SessionConfig::new("test", 512); + cfg.max_errors = 5; + cfg.greeting_delay = greeting_delay; + cfg.max_messages_per_session = 3; + cfg.max_session_data = 8192; + cfg.max_recipients = 3; + + Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)) +} + +fn create_basic_stream() -> tokio_test::io::Builder { + let mut builder = tokio_test::io::Builder::new(); + + builder.write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(b"HELO foo.bar\r\n") + .write(b"250 test you had me at HELO\r\n") + .read(b"EHLO foo.bar\r\n") + .write(b"250-test you had me at EHLO\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); + + builder +} + +fn stream_send_message(b: &mut tokio_test::io::Builder) { + b.read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"DATA\r\n") + .write(b"354 Start mail input; end with .\r\n") + .read(b"foobarmessage\r\n.\r\n") + .write( + b"250 2.0.0 Message (13 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n", + ); +} + +#[tokio::test] +async fn test_ehlo_required() { + let stream = tokio_test::io::Builder::new() + .write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(b"MAIL FROM:\r\n") + .write(b"503 5.5.1 Polite people say EHLO first.\r\n") + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_pipelining() { + let mut builder = create_basic_stream(); + let stream = builder + .read(b"MAIL FROM:\r\nRCPT TO:\r\nDATA\r\n") + .write(b"250 2.1.0 OK\r\n250 2.1.5 OK\r\n354 Start mail input; end with .\r\n") + .read(b"foobarmessage\r\n.\r\n") + .write( + b"250 2.0.0 Message (13 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n", + ) + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_basic_session() { + let mut builder = create_basic_stream(); + builder + .read(b"RCPT TO:\r\n") + .write(b"503 5.5.1 MAIL FROM is required first.\r\n") + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"MAIL FROM:\r\n") + .write(b"503 5.5.1 Multiple MAIL FROM commands are not allowed.\r\n") + .read(b"DATA\r\n") + .write(b"503 5.5.1 RCPT TO is required first.\r\n") + .read(b"RSET\r\n") + .write(b"250 2.0.0 OK\r\n") + .read(b"NOOP\r\n") + .write(b"250 2.0.0 OK\r\n") + .read(b"FOOB\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"HELP\r\n") + .write(b"502 5.5.1 Command not implemented.\r\n"); + + stream_send_message(&mut builder); + let stream = builder + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_client_sends_before_greeting() { + let stream = tokio_test::io::Builder::new() + .read(b"EHLO foo.bar\r\n") + .write(b"501 5.7.1 Client sent command before greeting banner.\r\n") + .build(); + + let mut session = create_session(stream, Some(Duration::from_millis(100))); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::SendsBeforeGreeting + )); +} + +#[tokio::test] +async fn test_bdat() { + let stream = create_basic_stream() + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"BDAT 10\r\n") + .read(b"01234") + .read(b"56789") + .write(b"250 2.6.0 Chunk accepted.\r\n") + .read(b"BDAT 10\r\n") + .read(b"987") + .read(b"654") + .read(b"3210") + .write(b"250 2.6.0 Chunk accepted.\r\n") + .read(b"BDAT 10 LAST\r\n") + .read(b"0123456789") + .write( + b"250 2.0.0 Message (30 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n", + ) + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let agent = Arc::new(TestDeliveryAgent::default()); + let resolver = TestRecipientResolver( + "dead@beef".try_into().unwrap(), + Some("bar@baz".try_into().unwrap()), + None, + ); + + let mut cfg = SessionConfig::new("test", 512); + cfg.delivery_agent = agent.clone(); + cfg.recipient_resolver = Arc::new(resolver); + + let remote_ip = IpAddr::from_str("1.1.1.1").unwrap(); + let mut session = Session::new(remote_ip, stream, Arc::new(cfg)); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); + + // Make sure the agent gets the correct mail + assert_eq!( + agent.0.lock().unwrap().clone(), + Some(EmailMessage { + id: Uuid::nil(), + session_id: Uuid::nil(), + remote_ip, + ehlo_hostname: fqdn!("foo.bar"), + mail_from: "foo@bar".try_into().unwrap(), + rcpt_to: vec!["bar@baz".try_into().unwrap()], + body: "012345678998765432100123456789".into(), + }) + ); +} + +#[tokio::test] +async fn test_data() { + let mut builder = create_basic_stream(); + stream_send_message(&mut builder); + + let stream = builder + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let agent = Arc::new(TestDeliveryAgent::default()); + let resolver = TestRecipientResolver( + "dead@beef".try_into().unwrap(), + Some("bar@baz".try_into().unwrap()), + None, + ); + + let mut cfg = SessionConfig::new("test", 512); + cfg.delivery_agent = agent.clone(); + cfg.recipient_resolver = Arc::new(resolver); + + let remote_ip = IpAddr::from_str("1.1.1.1").unwrap(); + let mut session = Session::new(remote_ip, stream, Arc::new(cfg)); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); + + // Make sure the agent gets the correct mail + assert_eq!( + agent.0.lock().unwrap().clone(), + Some(EmailMessage { + id: Uuid::nil(), + session_id: Uuid::nil(), + remote_ip, + ehlo_hostname: fqdn!("foo.bar"), + mail_from: email!("foo@bar"), + rcpt_to: vec![email!("bar@baz")], + body: "foobarmessage".into(), + }) + ) +} + +#[tokio::test] +async fn test_expand() { + let mut builder = create_basic_stream(); + stream_send_message(&mut builder); + + let stream = builder + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let agent = Arc::new(TestDeliveryAgent::default()); + let resolver = TestRecipientResolver( + "dead@beef".try_into().unwrap(), + None, + Some(vec![ + "dead@dead".try_into().unwrap(), + "bar@bax".try_into().unwrap(), + ]), + ); + + let mut cfg = SessionConfig::new("test", 512); + cfg.delivery_agent = agent.clone(); + cfg.recipient_resolver = Arc::new(resolver); + + let remote_ip = IpAddr::from_str("1.1.1.1").unwrap(); + let mut session = Session::new(remote_ip, stream, Arc::new(cfg)); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); + + // Make sure the agent gets the correct mail + assert_eq!( + agent.0.lock().unwrap().clone(), + Some(EmailMessage { + id: Uuid::nil(), + session_id: Uuid::nil(), + remote_ip, + ehlo_hostname: fqdn!("foo.bar"), + mail_from: email!("foo@bar"), + rcpt_to: vec![email!("dead@beef"), email!("dead@dead"), email!("bar@bax"),], + body: "foobarmessage".into(), + }) + ) +} + +#[tokio::test] +async fn test_max_recipients() { + let stream = create_basic_stream() + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"455 4.5.3 Too many recipients.\r\n") + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_max_message_size() { + let stream = create_basic_stream() + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"DATA\r\n") + .write(b"354 Start mail input; end with .\r\n") + .read(format!("{}\r\n.\r\n", "1".repeat(513)).as_bytes()) + .write(b"552 5.3.4 Message too big, we accept up to 512 bytes.\r\n") + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_max_messages_per_session() { + let mut builder = create_basic_stream(); + stream_send_message(&mut builder); + stream_send_message(&mut builder); + stream_send_message(&mut builder); + + let stream = builder + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"DATA\r\n") + .write(b"452 4.4.5 Maximum number of messages per session exceeded.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::TooManyMessagesPerSession + )); +} + +#[tokio::test] +async fn test_max_errors() { + let stream = tokio_test::io::Builder::new() + .write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"452 4.3.2 Too many errors.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::TooManyErrors + )); +} + +#[tokio::test] +async fn test_request_too_large() { + let stream = tokio_test::io::Builder::new() + .write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(format!("EHLO {}", "1".repeat(2048)).as_bytes()) + .read(format!("{}\r\n", "1".repeat(2048)).as_bytes()) + .write(b"554 5.3.4 Line is too long.\r\n") + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_max_session_transfer_quota() { + let stream = tokio_test::io::Builder::new() + .write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(format!("EHLO {}\r\n", "1".repeat(8192)).as_bytes()) + .write(b"452 4.7.28 Session transfer quota exceeded.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::TransferQuotaExceeded(_) + )); +} + +#[tokio::test] +async fn test_starttls() { + rustls::crypto::ring::default_provider() + .install_default() + .ok(); + + // Use an in-memory pipe + let (stream1, mut stream2) = duplex(8192); + + let rustls_server_cfg = ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new( + StubResolver::new(TEST_CERT_1.as_bytes(), TEST_KEY_1.as_bytes()).unwrap(), + )); + + let mut cfg = SessionConfig::new("test", 512); + cfg.tls_mode = SessionTlsMode::Required(Arc::new(rustls_server_cfg)); + + tokio::spawn(async move { + SessionManager::handle_connection( + stream1, + SocketAddr::from_str("1.1.1.1:123").unwrap(), + Arc::new(cfg), + CancellationToken::new(), + ) + .await; + }); + + let mut buf = vec![0; 8192]; + + let r = stream2.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"220 test ESMTP IC SMTP Gateway\r\n"); + + // Make sure EHLO advertises 250-STARTTLS + stream2.write_all(b"EHLO foo.bar\r\n").await.unwrap(); + let r = stream2.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"250-test you had me at EHLO\r\n250-STARTTLS\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); + + // Make sure TLS is required by the server due to SessionTlsMode::Required + stream2.write_all(b"MAIL FROM:\r\n").await.unwrap(); + let r = stream2.read(&mut buf).await.unwrap(); + assert_eq!( + &buf[..r], + b"503 5.5.1 TLS is required to submit mail on this server.\r\n" + ); + + // Fire up TLS handshake + stream2.write_all(b"STARTTLS\r\n").await.unwrap(); + let r = stream2.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"220 2.0.0 Ready to start TLS.\r\n"); + + let rustls_client_cfg = ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoopServerCertVerifier::default())) + .with_no_client_auth(); + let tls_connector = TlsConnector::from(Arc::new(rustls_client_cfg)); + let mut tls_stream = tls_connector + .connect(ServerName::try_from("foo").unwrap(), stream2) + .await + .unwrap(); + + // Make sure there's no 250-STARTTLS and 250-REQUIRETLS in EHLO anymore inside TLS session + tls_stream.write_all(b"EHLO foo.bar\r\n").await.unwrap(); + let r = tls_stream.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"250-test you had me at EHLO\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); + + // No TLS-in-TLS allowed + tls_stream.write_all(b"STARTTLS\r\n").await.unwrap(); + let r = tls_stream.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"504 5.7.4 Already in TLS mode.\r\n"); + + // Now MAIL FROM should work + tls_stream.write_all(b"MAIL FROM:\r\n").await.unwrap(); + let r = tls_stream.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"250 2.1.0 OK\r\n"); + + // All good + tls_stream.write_all(b"QUIT\r\n").await.unwrap(); + let r = tls_stream.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"221 2.0.0 Bye.\r\n"); +} + +#[tokio::test] +async fn test_with_smtp_client() { + rustls::crypto::ring::default_provider() + .install_default() + .ok(); + + let rustls_server_cfg = ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new( + StubResolver::new(TEST_CERT_1.as_bytes(), TEST_KEY_1.as_bytes()).unwrap(), + )); + + let agent = Arc::new(TestDeliveryAgent::default()); + let notification_handler = Arc::new(TestNotificationsReceiver::default()); + + // Listen on the random free port + let listener = listen_tcp("127.0.0.1:0".parse().unwrap(), ListenerOpts::default()).unwrap(); + let port = listener.local_addr().unwrap().port(); + + let mut cfg = SessionConfig::new("test", 10 * 1024 * 1024); + cfg.delivery_agent = agent.clone(); + cfg.tls_mode = SessionTlsMode::Allowed(Arc::new(rustls_server_cfg)); + cfg.notifications_handler = Some(notification_handler.clone()); + + let token = CancellationToken::new(); + let token_child = token.child_token(); + let server = Server::new_with_listener(listener, cfg).unwrap(); + let server_handle = tokio::spawn(async move { + server.serve(token_child).await.unwrap(); + }); + + let message = MessageBuilder::new() + .from(("John Doe", "john@doe.com")) + .to(("Jane Doe", "jane@doe.com")) + .subject("Hello") + .text_body("Blah"); + + let mut client = SmtpClientBuilder::new("127.0.0.1", port) + .unwrap() + .implicit_tls(false) + .helo_host("foo.bar") + .allow_invalid_certs() + .connect() + .await + .unwrap(); + + // Try some commands + client.noop().await.unwrap(); + client.rset().await.unwrap(); + + // Make sure we have the required caps + let caps = client.capabilities("foo.bar", false).await.unwrap(); + assert_eq!( + caps.capabilities, + EXT_SMTP_UTF8 + | EXT_8BIT_MIME + | EXT_CHUNKING + | EXT_ENHANCED_STATUS_CODES + | EXT_SIZE + | EXT_PIPELINING + ); + + client.send(message).await.unwrap(); + client.quit().await.unwrap(); + + // Make sure the agent gets the correct mail + let msg = agent.0.lock().unwrap().clone().unwrap(); + assert_eq!(msg.id, Uuid::nil()); + assert_eq!(msg.ehlo_hostname, "foo.bar"); + assert_eq!(msg.mail_from, "john@doe.com"); + assert_eq!(msg.rcpt_to, vec!["jane@doe.com"]); + + let parsed = MessageParser::new().parse(&msg.body).unwrap(); + assert_eq!(parsed.subject(), Some("Hello")); + assert_eq!( + *parsed.from().unwrap(), + Address::List(vec![Addr::new(Some("John Doe"), "john@doe.com")]) + ); + assert_eq!( + *parsed.to().unwrap(), + Address::List(vec![Addr::new(Some("Jane Doe"), "jane@doe.com")]) + ); + assert_eq!(parsed.body_text(0).unwrap(), "Blah"); + + // Shutdown the server + token.cancel(); + server_handle.await.unwrap(); + + // Check notifications + let (msg, error) = notification_handler.msg.lock().unwrap().clone().unwrap(); + assert_eq!(msg.id, Uuid::nil()); + assert_eq!(msg.ehlo_hostname, "foo.bar"); + assert_eq!(msg.mail_from, "john@doe.com"); + assert_eq!(msg.rcpt_to, vec!["jane@doe.com"]); + assert!(error.is_none()); + + let r = notification_handler.sess.lock().unwrap().take().unwrap(); + assert_eq!(r.0, Uuid::nil()); + assert_eq!(r.1, IpAddr::from_str("127.0.0.1").unwrap()); + assert!(r.2.last_error.is_none()); + assert_eq!(r.4.unwrap().protocol, ProtocolVersion::TLSv1_3); + assert!(matches!(r.5, SessionError::Quit)); +} + +#[test] +fn test_session_error() { + let s: &'static str = SessionError::Quit.into(); + assert_eq!(s, "quit"); + + let s: &'static str = SessionError::SendsBeforeGreeting.into(); + assert_eq!(s, "sends_before_greeting"); + + let s: &'static str = SessionError::TooManyMessagesPerSession.into(); + assert_eq!(s, "too_many_messages_per_session"); +} diff --git a/ic-bn-lib/src/smtp/mod.rs b/ic-bn-lib/src/smtp/mod.rs index 1a1646d..0bff4d4 100644 --- a/ic-bn-lib/src/smtp/mod.rs +++ b/ic-bn-lib/src/smtp/mod.rs @@ -1,14 +1,21 @@ -use std::fmt::{Debug, Display}; +use std::{ + fmt::{Debug, Display}, + net::IpAddr, +}; use async_trait::async_trait; use bytes::Bytes; use fqdn::FQDN; +use ic_bn_lib_common::types::http::TlsInfo; use itertools::Itertools; -use strum::Display; +use strum::{Display, IntoStaticStr}; use tracing::warn; use uuid::Uuid; -use crate::smtp::address::EmailAddress; +use crate::smtp::{ + address::EmailAddress, + inbound::{SessionCounters, SessionData, SessionError}, +}; pub mod address; pub mod cli; @@ -49,10 +56,48 @@ pub enum DeliveryError { Permanent(String), } +/// Error that might happen during message vaildation or delivery +#[derive(thiserror::Error, Clone, Debug, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum MessageError { + #[error("Delivery failed: {0}")] + DeliveryFailed(#[from] DeliveryError), + #[error("Parsing failed")] + ParsingFailed, + #[error("Too many 'Received' headers")] + TooManyReceivedHeaders, + #[error("DKIM validation failed: {0}")] + DkimValidationFailed(String), +} + +/// Error that might happen during SMTP exchange +#[derive(thiserror::Error, Clone, Debug, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum ProtocolError { + #[error("Invalid EHLO hostname: {0}")] + InvalidEhloHostname(String), + #[error("Invalid command sequence: {0}")] + InvalidSequenceOfCommands(String), + #[error("Sender validation failed: {0}")] + SenderValidationFailed(String), + #[error("Recipient validation failed: {0}")] + RecipientValidationFailed(String), + #[error("Reverse IP validation failed: {0}")] + ReverseIpValidationFailed(String), + #[error("SPF validation failed: {0}")] + SpfValidationFailed(String), + #[error("Message too big: {0}")] + MessageTooBig(String), + #[error("SMTP protocol error: {0}")] + SmtpError(String), +} + /// Low-level E-Mail representation #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct EmailMessage { pub id: Uuid, + pub session_id: Uuid, + pub remote_ip: IpAddr, pub ehlo_hostname: FQDN, pub mail_from: EmailAddress, pub rcpt_to: Vec, @@ -63,8 +108,10 @@ impl Display for EmailMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "id: {}, ehlo: {}, from: {}, to: {}, msg: {}", + "id: {}, session_id: {}, remote_ip: {}, ehlo: {}, from: {}, to: {}, msg: {}", self.id, + self.session_id, + self.remote_ip, self.ehlo_hostname, self.mail_from, self.rcpt_to.iter().map(|x| x.to_string()).join(", "), @@ -85,6 +132,23 @@ pub trait ResolvesRecipient: Send + Sync + Debug { ) -> Result; } +/// Notifies about events +#[async_trait] +pub trait ReceivesNotifications: Send + Sync + Debug { + /// Notify when the message is queued + async fn notify_about_message(&self, message: EmailMessage, error: Option); + /// Notify when the session is finished + async fn notify_session_finish( + &self, + id: Uuid, + remote_ip: IpAddr, + data: SessionData, + counters: SessionCounters, + tls_info: Option, + error: SessionError, + ); +} + /// Delivers the E-Mail message #[async_trait] pub trait DeliversMail: Send + Sync + Debug { From d3e3e3d477bd18dc44c68be73d1cf8884b246761 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Mon, 1 Jun 2026 09:11:12 +0000 Subject: [PATCH 4/9] export rcgen, use aws-lc-rs --- ic-bn-lib/src/lib.rs | 1 + ic-bn-lib/src/smtp/inbound/tests.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ic-bn-lib/src/lib.rs b/ic-bn-lib/src/lib.rs index 44235ef..b4dc36c 100644 --- a/ic-bn-lib/src/lib.rs +++ b/ic-bn-lib/src/lib.rs @@ -37,6 +37,7 @@ pub use ic_bn_lib_common; #[cfg(feature = "smtp")] pub use mail_auth; pub use prometheus; +pub use rcgen; pub use reqwest; pub use rustls; #[cfg(feature = "acme-alpn")] diff --git a/ic-bn-lib/src/smtp/inbound/tests.rs b/ic-bn-lib/src/smtp/inbound/tests.rs index f5464ef..881aa15 100644 --- a/ic-bn-lib/src/smtp/inbound/tests.rs +++ b/ic-bn-lib/src/smtp/inbound/tests.rs @@ -515,7 +515,7 @@ async fn test_max_session_transfer_quota() { #[tokio::test] async fn test_starttls() { - rustls::crypto::ring::default_provider() + rustls::crypto::aws_lc_rs::default_provider() .install_default() .ok(); @@ -597,7 +597,7 @@ async fn test_starttls() { #[tokio::test] async fn test_with_smtp_client() { - rustls::crypto::ring::default_provider() + rustls::crypto::aws_lc_rs::default_provider() .install_default() .ok(); From 1ba90815f1932ccc15938dfb3b9d9a8c485f30ab Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Mon, 1 Jun 2026 10:57:36 +0000 Subject: [PATCH 5/9] Track session start time --- ic-bn-lib/src/smtp/inbound/mod.rs | 8 ++++---- ic-bn-lib/src/smtp/inbound/session.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ic-bn-lib/src/smtp/inbound/mod.rs b/ic-bn-lib/src/smtp/inbound/mod.rs index d6d5ac1..b0785d1 100644 --- a/ic-bn-lib/src/smtp/inbound/mod.rs +++ b/ic-bn-lib/src/smtp/inbound/mod.rs @@ -257,16 +257,16 @@ impl Default for SessionData { /// SMTP session counters #[derive(Clone, Debug)] pub struct SessionCounters { - pub valid_until: Instant, + pub started: Instant, pub bytes_ingested: usize, pub messages_queued: usize, pub errors: usize, } impl SessionCounters { - fn new(ttl: Duration) -> Self { + fn new() -> Self { Self { - valid_until: Instant::now() + ttl, + started: Instant::now(), bytes_ingested: 0, messages_queued: 0, errors: 0, @@ -308,7 +308,7 @@ impl Session { stream, state: SessionState::Greeting, data: SessionData::default(), - counters: SessionCounters::new(cfg.max_session_duration), + counters: SessionCounters::new(), cfg, tls_info: None, } diff --git a/ic-bn-lib/src/smtp/inbound/session.rs b/ic-bn-lib/src/smtp/inbound/session.rs index d97e7d6..114e1ab 100644 --- a/ic-bn-lib/src/smtp/inbound/session.rs +++ b/ic-bn-lib/src/smtp/inbound/session.rs @@ -222,7 +222,7 @@ impl Session { } // Check if we are over session time quota - if Instant::now() > self.counters.valid_until { + if Instant::now() > self.counters.started + self.cfg.max_session_duration { self.reply("452", "4.3.2", "Session open for too long.") .await?; return Err(SessionError::TtlExceeded( From a69a40750f19242aabb487637eae14ab834344aa Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Mon, 1 Jun 2026 11:36:09 +0000 Subject: [PATCH 6/9] Add error handler, make set_error async --- ic-bn-lib/src/smtp/inbound/ehlo.rs | 14 ++++--- ic-bn-lib/src/smtp/inbound/mail_from.rs | 33 ++++++++++----- ic-bn-lib/src/smtp/inbound/manager.rs | 17 ++------ ic-bn-lib/src/smtp/inbound/mod.rs | 46 ++++++++++++++++---- ic-bn-lib/src/smtp/inbound/rcpt_to.rs | 12 ++++-- ic-bn-lib/src/smtp/inbound/session.rs | 33 ++++++++++----- ic-bn-lib/src/smtp/inbound/tests.rs | 56 +++++++++++-------------- ic-bn-lib/src/smtp/mod.rs | 20 ++++----- 8 files changed, 137 insertions(+), 94 deletions(-) diff --git a/ic-bn-lib/src/smtp/inbound/ehlo.rs b/ic-bn-lib/src/smtp/inbound/ehlo.rs index 4f3f222..9b4f1e8 100644 --- a/ic-bn-lib/src/smtp/inbound/ehlo.rs +++ b/ic-bn-lib/src/smtp/inbound/ehlo.rs @@ -20,7 +20,8 @@ impl Session { info!("{self}: {host}: Invalid EHLO hostname"); self.set_error(ProtocolError::InvalidEhloHostname(format!( "{host}: incorrect hostname" - ))); + ))) + .await; return self.reply("550", "5.5.0", "Invalid EHLO hostname.").await; }; @@ -36,7 +37,8 @@ impl Session { info!("{self}: {host}: EHLO is not FQDN"); self.set_error(ProtocolError::InvalidEhloHostname(format!( "{host}: not FQDN" - ))); + ))) + .await; return self .reply("550", "5.5.0", "EHLO hostname must be an FQDN.") .await; @@ -53,7 +55,8 @@ impl Session { info!("{self}: {host}: EHLO not found in DNS"); self.set_error(ProtocolError::InvalidEhloHostname(format!( "{host}: not found in DNS" - ))); + ))) + .await; return self .reply("550", "5.5.0", "EHLO hostname not found in DNS.") .await; @@ -65,8 +68,9 @@ impl Session { if is_error_negative_lookup(&e) { self.set_error(ProtocolError::InvalidEhloHostname(format!( - "{host}: not found in DNS" - ))); + "{host}: not found in DNS: {e:#}" + ))) + .await; return self .reply("550", "5.5.0", "EHLO hostname not found in DNS.") diff --git a/ic-bn-lib/src/smtp/inbound/mail_from.rs b/ic-bn-lib/src/smtp/inbound/mail_from.rs index f9b9863..780f104 100644 --- a/ic-bn-lib/src/smtp/inbound/mail_from.rs +++ b/ic-bn-lib/src/smtp/inbound/mail_from.rs @@ -25,7 +25,8 @@ impl Session { let Some(helo_hostname) = self.data.ehlo_hostname.as_ref().map(|x| x.to_string()) else { self.set_error(ProtocolError::InvalidSequenceOfCommands( "MAIL FROM before EHLO".into(), - )); + )) + .await; return self .reply("503", "5.5.1", "Polite people say EHLO first.") .await; @@ -34,7 +35,8 @@ impl Session { if self.data.mail_from.is_some() { self.set_error(ProtocolError::InvalidSequenceOfCommands( "Multiple MAIL FROM".into(), - )); + )) + .await; return self .reply( "503", @@ -66,7 +68,8 @@ impl Session { self.set_error(ProtocolError::MessageTooBig(format!( "MAIL FROM-specified size is too big: {} > {}", from.size, self.cfg.max_message_size - ))); + ))) + .await; return self.message_too_big().await; } @@ -84,7 +87,8 @@ impl Session { self.set_error(ProtocolError::SenderValidationFailed(format!( "Incorrect sender address: {}", from.address - ))); + ))) + .await; return self .reply("550", "5.7.1", "Sender address is incorrect.") .await; @@ -106,7 +110,8 @@ impl Session { self.set_error(ProtocolError::SenderValidationFailed(format!( "Sender domain is not FQDN: {}", address.domain() - ))); + ))) + .await; return self.reply("550", "5.7.2", "Sender must be an FQDN.").await; }; @@ -124,7 +129,8 @@ impl Session { ); self.set_error(ProtocolError::SenderValidationFailed( "No MX records found".into(), - )); + )) + .await; return self .reply( "550", @@ -141,7 +147,8 @@ impl Session { ); self.set_error(ProtocolError::SenderValidationFailed( "No MX records found".into(), - )); + )) + .await; return self .reply( "550", @@ -155,7 +162,8 @@ impl Session { ); self.set_error(ProtocolError::SenderValidationFailed(format!( "Sender domain verification temporary error: {e:#}", - ))); + ))) + .await; return self .reply("451", "4.7.25", "Temporary error validating sender domain.") .await; @@ -188,7 +196,8 @@ impl Session { self.set_error(ProtocolError::SpfValidationFailed(format!( "SPF validation temporary error: {:?}", output.explanation() - ))); + ))) + .await; return self .reply("451", "4.7.24", "Temporary SPF validation error.") .await; @@ -201,7 +210,8 @@ impl Session { self.set_error(ProtocolError::SpfValidationFailed(format!( "SPF validation permanent error: {:?}", output.explanation() - ))); + ))) + .await; return self .reply_with("550", "5.7.23", |buf| { write!(buf, "SPF validation failed")?; @@ -225,7 +235,8 @@ impl Session { /// Replies about failed reverse IP verification async fn verify_reverse_ip_reply(&mut self, permanent: bool, msg: &str) -> SessionResult { - self.set_error(ProtocolError::ReverseIpValidationFailed(msg.into())); + self.set_error(ProtocolError::ReverseIpValidationFailed(msg.into())) + .await; // Emit permanent errors only if in strict mode if permanent && self.cfg.verify_reverse_ip_strict { diff --git a/ic-bn-lib/src/smtp/inbound/manager.rs b/ic-bn-lib/src/smtp/inbound/manager.rs index 2b231de..80e3466 100644 --- a/ic-bn-lib/src/smtp/inbound/manager.rs +++ b/ic-bn-lib/src/smtp/inbound/manager.rs @@ -87,15 +87,7 @@ impl SessionManager { async fn notify(session: Session, error: SessionError) { if let Some(v) = &session.cfg.notifications_handler { - v.notify_session_finish( - session.id, - session.remote_ip, - session.data, - session.counters, - session.tls_info, - error, - ) - .await; + v.notify_session_finish(session.meta(), error).await; } } } @@ -112,6 +104,7 @@ impl Session { } }; + let meta = self.meta(); let (stream, tls_info) = match tls_handshake(tls_config, self.stream).await { Ok(v) => v, Err(e) => { @@ -120,11 +113,7 @@ impl Session { // Session is partially consumed by `tls_handshake`, so we can't use `Manager::notify()` if let Some(v) = &self.cfg.notifications_handler { v.notify_session_finish( - self.id, - self.remote_ip, - self.data, - self.counters, - self.tls_info, + meta, SessionError::TlsHandshakeFailed(error_str.clone()), ) .await; diff --git a/ic-bn-lib/src/smtp/inbound/mod.rs b/ic-bn-lib/src/smtp/inbound/mod.rs index b0785d1..d806a65 100644 --- a/ic-bn-lib/src/smtp/inbound/mod.rs +++ b/ic-bn-lib/src/smtp/inbound/mod.rs @@ -274,16 +274,30 @@ impl SessionCounters { } } -/// SMTP Session -pub struct Session { +/// Session metadata for logging/notification purposes +#[derive(Clone, Debug)] +pub struct SessionMeta { pub id: Uuid, + pub message_id: Uuid, pub remote_ip: IpAddr, + pub tls_info: Option, + pub counters: SessionCounters, + pub last_error: Option, + pub ehlo_hostname: Option, + pub mail_from: Option, + pub rcpt_to: Vec, +} + +/// SMTP Session +pub struct Session { + id: Uuid, + remote_ip: IpAddr, stream: S, state: SessionState, - pub data: SessionData, - pub counters: SessionCounters, + data: SessionData, + counters: SessionCounters, cfg: Arc, - pub tls_info: Option, + tls_info: Option, } impl Display for Session { @@ -314,9 +328,27 @@ impl Session { } } - fn set_error(&mut self, error: ProtocolError) { - self.data.last_error = Some(error); + async fn set_error(&mut self, error: ProtocolError) { + self.data.last_error = Some(error.clone()); self.counters.errors += 1; + + if let Some(v) = &self.cfg.notifications_handler { + v.notify_protocol_error(self.meta(), error).await; + } + } + + fn meta(&self) -> SessionMeta { + SessionMeta { + id: self.id, + message_id: self.data.message_id, + remote_ip: self.remote_ip, + tls_info: self.tls_info.clone(), + counters: self.counters.clone(), + last_error: self.data.last_error.clone(), + ehlo_hostname: self.data.ehlo_hostname.clone(), + mail_from: self.data.mail_from.clone(), + rcpt_to: self.data.rcpt_to.clone(), + } } } diff --git a/ic-bn-lib/src/smtp/inbound/rcpt_to.rs b/ic-bn-lib/src/smtp/inbound/rcpt_to.rs index 736d17d..c6ff10d 100644 --- a/ic-bn-lib/src/smtp/inbound/rcpt_to.rs +++ b/ic-bn-lib/src/smtp/inbound/rcpt_to.rs @@ -21,7 +21,8 @@ impl Session { let Some(mail_from) = &self.data.mail_from else { self.set_error(ProtocolError::InvalidSequenceOfCommands( "RCPT TO before MAIL FROM".into(), - )); + )) + .await; return self .reply("503", "5.5.1", "MAIL FROM is required first.") .await; @@ -41,7 +42,8 @@ impl Session { self.set_error(ProtocolError::RecipientValidationFailed(format!( "Incorrect address: {}", to.address - ))); + ))) + .await; return self.reply("550", "5.1.2", "Incorrect address.").await; }; @@ -55,7 +57,8 @@ impl Session { "Too many recipients: {} > {}", self.data.rcpt_to.len(), self.cfg.max_recipients - ))); + ))) + .await; return self.reply("455", "4.5.3", "Too many recipients.").await; } @@ -86,7 +89,8 @@ impl Session { info!("{self}: {}: recipient resolution error: {e:#}", to.address); self.set_error(ProtocolError::RecipientValidationFailed(format!( "Recipient resolution failed: {e:#}", - ))); + ))) + .await; return match e { RecipientResolveError::UnknownDomain => { diff --git a/ic-bn-lib/src/smtp/inbound/session.rs b/ic-bn-lib/src/smtp/inbound/session.rs index 114e1ab..ce1f985 100644 --- a/ic-bn-lib/src/smtp/inbound/session.rs +++ b/ic-bn-lib/src/smtp/inbound/session.rs @@ -94,7 +94,8 @@ impl Session { "{} > {}", self.data.message.len(), max_size - ))); + ))) + .await; self.reply_with("552", "5.3.4", |buf| { write!(buf, "Message too big, we accept up to {max_size} bytes.",) }) @@ -130,7 +131,8 @@ impl Session { } async fn handle_error(&mut self, error: SmtpError) -> SessionResult<()> { - self.set_error(ProtocolError::SmtpError(error.to_string())); + self.set_error(ProtocolError::SmtpError(error.to_string())) + .await; let (code, ext, msg) = match error { SmtpError::UnknownCommand | SmtpError::InvalidResponse { .. } => { @@ -201,7 +203,8 @@ impl Session { self.reply("250", "2.0.0", "OK").await?; } _ => { - self.set_error(ProtocolError::SmtpError("Command not implemented".into())); + self.set_error(ProtocolError::SmtpError("Command not implemented".into())) + .await; self.reply("502", "5.5.1", "Command not implemented.") .await?; } @@ -291,12 +294,14 @@ impl Session { if self.tls_info.is_some() { self.set_error(ProtocolError::InvalidSequenceOfCommands( "STARTTLS inside STARTTLS".into(), - )); + )) + .await; self.reply("504", "5.7.4", "Already in TLS mode.").await?; } else if !self.cfg.tls_mode.enabled() { self.set_error(ProtocolError::InvalidSequenceOfCommands( "STARTTLS without TLS enabled".into(), - )); + )) + .await; self.reply("502", "5.7.0", "TLS not available.").await?; } else { self.reply("220", "2.0.0", "Ready to start TLS.").await?; @@ -356,7 +361,8 @@ impl Session { SessionState::RequestTooLarge(rx) => { // If line-feed found - issue error, otherwise keep ingesting if rx.ingest(&mut iter) { - self.set_error(ProtocolError::SmtpError("Line is too long".into())); + self.set_error(ProtocolError::SmtpError("Line is too long".into())) + .await; self.reply("554", "5.3.4", "Line is too long.").await?; state = SessionState::default(); } else { @@ -550,7 +556,7 @@ impl Session { // Run configured verification steps on the message body if let Some(e) = self.verify_message(&msg).await? { if let Some(v) = &self.cfg.notifications_handler { - v.notify_about_message(msg.clone(), Some(e)).await; + v.notify_message(self.meta(), msg.clone(), Some(e)).await; } self.reset_message(); @@ -566,8 +572,12 @@ impl Session { ); if let Some(v) = &self.cfg.notifications_handler { - v.notify_about_message(msg.clone(), Some(MessageError::DeliveryFailed(e.clone()))) - .await; + v.notify_message( + self.meta(), + msg.clone(), + Some(MessageError::DeliveryFailed(e.clone())), + ) + .await; } self.reset_message(); @@ -588,7 +598,7 @@ impl Session { } if let Some(v) = &self.cfg.notifications_handler { - v.notify_about_message(msg.clone(), None).await; + v.notify_message(self.meta(), msg.clone(), None).await; } info!( @@ -617,7 +627,8 @@ impl Session { } else if self.data.rcpt_to.is_empty() { self.set_error(ProtocolError::InvalidSequenceOfCommands( "DATA before RCPT TO".into(), - )); + )) + .await; self.reply("503", "5.5.1", "RCPT TO is required first.") .await?; return Ok(false); diff --git a/ic-bn-lib/src/smtp/inbound/tests.rs b/ic-bn-lib/src/smtp/inbound/tests.rs index 881aa15..ec5b1c7 100644 --- a/ic-bn-lib/src/smtp/inbound/tests.rs +++ b/ic-bn-lib/src/smtp/inbound/tests.rs @@ -41,35 +41,28 @@ impl DeliversMail for TestDeliveryAgent { #[allow(clippy::type_complexity)] #[derive(Debug, Default)] pub struct TestNotificationsReceiver { - msg: Mutex)>>, - sess: Mutex< - Option<( - Uuid, - IpAddr, - SessionData, - SessionCounters, - Option, - SessionError, - )>, - >, + msg: Mutex)>>, + sess: Mutex>, + proto_error: Mutex>, } #[async_trait] impl ReceivesNotifications for TestNotificationsReceiver { - async fn notify_about_message(&self, message: EmailMessage, error: Option) { - *self.msg.lock().unwrap() = Some((message, error)); - } - - async fn notify_session_finish( + async fn notify_message( &self, - id: Uuid, - remote_ip: IpAddr, - data: SessionData, - counters: SessionCounters, - tls_info: Option, - error: SessionError, + meta: SessionMeta, + message: EmailMessage, + error: Option, ) { - *self.sess.lock().unwrap() = Some((id, remote_ip, data, counters, tls_info, error)); + *self.msg.lock().unwrap() = Some((meta, message, error)); + } + + async fn notify_protocol_error(&self, meta: SessionMeta, error: ProtocolError) { + *self.proto_error.lock().unwrap() = Some((meta, error)); + } + + async fn notify_session_finish(&self, meta: SessionMeta, error: SessionError) { + *self.sess.lock().unwrap() = Some((meta, error)); } } @@ -684,19 +677,20 @@ async fn test_with_smtp_client() { server_handle.await.unwrap(); // Check notifications - let (msg, error) = notification_handler.msg.lock().unwrap().clone().unwrap(); - assert_eq!(msg.id, Uuid::nil()); + let (meta, msg, error) = notification_handler.msg.lock().unwrap().clone().unwrap(); + assert_eq!(meta.id, Uuid::nil()); + assert_eq!(meta.message_id, Uuid::nil()); assert_eq!(msg.ehlo_hostname, "foo.bar"); assert_eq!(msg.mail_from, "john@doe.com"); assert_eq!(msg.rcpt_to, vec!["jane@doe.com"]); assert!(error.is_none()); - let r = notification_handler.sess.lock().unwrap().take().unwrap(); - assert_eq!(r.0, Uuid::nil()); - assert_eq!(r.1, IpAddr::from_str("127.0.0.1").unwrap()); - assert!(r.2.last_error.is_none()); - assert_eq!(r.4.unwrap().protocol, ProtocolVersion::TLSv1_3); - assert!(matches!(r.5, SessionError::Quit)); + let (meta, error) = notification_handler.sess.lock().unwrap().take().unwrap(); + assert_eq!(meta.id, Uuid::nil()); + assert_eq!(meta.remote_ip, IpAddr::from_str("127.0.0.1").unwrap()); + assert!(meta.last_error.is_none()); + assert_eq!(meta.tls_info.unwrap().protocol, ProtocolVersion::TLSv1_3); + assert!(matches!(error, SessionError::Quit)); } #[test] diff --git a/ic-bn-lib/src/smtp/mod.rs b/ic-bn-lib/src/smtp/mod.rs index 0bff4d4..27578ed 100644 --- a/ic-bn-lib/src/smtp/mod.rs +++ b/ic-bn-lib/src/smtp/mod.rs @@ -6,7 +6,6 @@ use std::{ use async_trait::async_trait; use bytes::Bytes; use fqdn::FQDN; -use ic_bn_lib_common::types::http::TlsInfo; use itertools::Itertools; use strum::{Display, IntoStaticStr}; use tracing::warn; @@ -14,7 +13,7 @@ use uuid::Uuid; use crate::smtp::{ address::EmailAddress, - inbound::{SessionCounters, SessionData, SessionError}, + inbound::{SessionError, SessionMeta}, }; pub mod address; @@ -136,17 +135,16 @@ pub trait ResolvesRecipient: Send + Sync + Debug { #[async_trait] pub trait ReceivesNotifications: Send + Sync + Debug { /// Notify when the message is queued - async fn notify_about_message(&self, message: EmailMessage, error: Option); - /// Notify when the session is finished - async fn notify_session_finish( + async fn notify_message( &self, - id: Uuid, - remote_ip: IpAddr, - data: SessionData, - counters: SessionCounters, - tls_info: Option, - error: SessionError, + meta: SessionMeta, + message: EmailMessage, + error: Option, ); + /// Notify when the protocol error happens + async fn notify_protocol_error(&self, meta: SessionMeta, error: ProtocolError); + /// Notify when the session is finished + async fn notify_session_finish(&self, meta: SessionMeta, error: SessionError); } /// Delivers the E-Mail message From afe3d6051af4002d63f13c1162862aa02682d50e Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Mon, 1 Jun 2026 11:47:39 +0000 Subject: [PATCH 7/9] Remove session info from EmailMessage, push SessionMeta instead --- ic-bn-lib/src/smtp/ic/delivery_agent.rs | 35 +++++++++++++++++------ ic-bn-lib/src/smtp/inbound/mod.rs | 2 +- ic-bn-lib/src/smtp/inbound/session.rs | 37 +++++++++++++------------ ic-bn-lib/src/smtp/inbound/tests.rs | 18 ++++-------- ic-bn-lib/src/smtp/mod.rs | 26 ++++++++--------- 5 files changed, 64 insertions(+), 54 deletions(-) diff --git a/ic-bn-lib/src/smtp/ic/delivery_agent.rs b/ic-bn-lib/src/smtp/ic/delivery_agent.rs index 3b83f92..fd90659 100644 --- a/ic-bn-lib/src/smtp/ic/delivery_agent.rs +++ b/ic-bn-lib/src/smtp/ic/delivery_agent.rs @@ -22,6 +22,7 @@ use crate::{ candid::{Envelope, SmtpRequest, SmtpResponse}, parse_email, }, + inbound::SessionMeta, }, }; @@ -242,10 +243,14 @@ impl IcSmtpDeliveryAgent { #[async_trait] impl DeliversMail for IcSmtpDeliveryAgent { - async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError> { + async fn deliver_mail( + &self, + meta: SessionMeta, + message: EmailMessage, + ) -> Result<(), DeliveryError> { info!( - "{self}: delivering mail, ehlo: '{}', from: '{}', to: '{:?}', id '{}'", - message.ehlo_hostname, message.mail_from, message.rcpt_to, message.id + "{self}: delivering mail, ehlo: {:?}, from: '{}', to: '{:?}', id '{}'", + meta.ehlo_hostname, message.mail_from, message.rcpt_to, message.id ); // A single message can be (potentially) destined for several canisters/domains. @@ -369,7 +374,10 @@ mod tests { use crate::{ email, - smtp::ic::candid::{Header, Message, SmtpOk, SmtpRequestError}, + smtp::{ + ic::candid::{Header, Message, SmtpOk, SmtpRequestError}, + inbound::SessionCounters, + }, }; use super::*; @@ -610,9 +618,6 @@ mod tests { let message = EmailMessage { id: Uuid::nil(), - session_id: Uuid::nil(), - remote_ip: IpAddr::from_str("1.1.1.1").unwrap(), - ehlo_hostname: fqdn!("foo.bar"), mail_from: email!("john@doe.com"), rcpt_to: vec![ // these two go to qoctq-giaaa-aaaaa-aaaea-cai as a single mail @@ -624,7 +629,21 @@ mod tests { body: message.as_bytes().into(), }; - delivery_agent.deliver_mail(message.clone()).await.unwrap(); + let meta = SessionMeta { + id: Uuid::nil(), + message_id: Uuid::nil(), + remote_ip: IpAddr::from_str("1.1.1.1").unwrap(), + tls_info: None, + ehlo_hostname: None, + counters: SessionCounters::new(), + last_error: None, + mail_from: None, + rcpt_to: vec![], + }; + delivery_agent + .deliver_mail(meta, message.clone()) + .await + .unwrap(); let body = indoc! {r#" --XXXXboundary text diff --git a/ic-bn-lib/src/smtp/inbound/mod.rs b/ic-bn-lib/src/smtp/inbound/mod.rs index d806a65..531b029 100644 --- a/ic-bn-lib/src/smtp/inbound/mod.rs +++ b/ic-bn-lib/src/smtp/inbound/mod.rs @@ -264,7 +264,7 @@ pub struct SessionCounters { } impl SessionCounters { - fn new() -> Self { + pub(crate) fn new() -> Self { Self { started: Instant::now(), bytes_ingested: 0, diff --git a/ic-bn-lib/src/smtp/inbound/session.rs b/ic-bn-lib/src/smtp/inbound/session.rs index ce1f985..a0fd9f9 100644 --- a/ic-bn-lib/src/smtp/inbound/session.rs +++ b/ic-bn-lib/src/smtp/inbound/session.rs @@ -444,8 +444,8 @@ impl Session { let Some(auth_message) = AuthenticatedMessage::parse(&msg.body) else { info!( - "{self}: {}: {} -> {:?}: message parsing failed", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{self}: {} -> {:?}: message parsing failed", + msg.mail_from, msg.rcpt_to ); self.reply("550", "5.7.7", "Failed to parse the message") .await?; @@ -454,8 +454,8 @@ impl Session { if auth_message.received_headers_count() > self.cfg.max_received_headers { info!( - "{self}: {}: {} -> {:?}: message verification failed: too many 'Received' headers", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{self}: {} -> {:?}: message verification failed: too many 'Received' headers", + msg.mail_from, msg.rcpt_to ); self.reply( "450", @@ -480,16 +480,16 @@ impl Session { { // If any of the signatures that failed with a temporary error - return temporary SMTP error code info!( - "{log_name}: {}: {} -> {:?}: DKIM verification temporary failure: {error}", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{log_name}: {} -> {:?}: DKIM verification temporary failure: {error}", + msg.mail_from, msg.rcpt_to ); self.reply("451", "4.7.20", "DKIM validation temporary failure.") .await } else { // Otherwise permanent info!( - "{log_name}: {}: {} -> {:?}: DKIM verification failure: {error}", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{log_name}: {} -> {:?}: DKIM verification failure: {error}", + msg.mail_from, msg.rcpt_to ); self.reply("550", "5.7.20", "DKIM validation failed.").await } @@ -525,8 +525,7 @@ impl Session { if signatures_passed > 0 { debug!( - "{log_name}: {}: {} -> {:?}: DKIM validation succeeded ({signatures_passed}/{} signatures passed)", - msg.ehlo_hostname, + "{log_name}: {} -> {:?}: DKIM validation succeeded ({signatures_passed}/{} signatures passed)", msg.mail_from, msg.rcpt_to, outputs.len() @@ -545,9 +544,6 @@ impl Session { // It's better to panic in tests if they are not. let msg = EmailMessage { id, - session_id: self.id, - remote_ip: self.remote_ip, - ehlo_hostname: self.data.ehlo_hostname.clone().unwrap(), mail_from: self.data.mail_from.take().unwrap(), rcpt_to: std::mem::take(&mut self.data.rcpt_to), body: std::mem::take(&mut self.data.message).into(), @@ -565,10 +561,15 @@ impl Session { // Deliver the message. // Message cloning is rather lightweight (body is Bytes) - if let Err(e) = self.cfg.delivery_agent.deliver_mail(msg.clone()).await { + if let Err(e) = self + .cfg + .delivery_agent + .deliver_mail(self.meta(), msg.clone()) + .await + { info!( - "{self}: {}: {} -> {:?}: message delivery failed: {e:#}", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{self}: {} -> {:?}: message delivery failed: {e:#}", + msg.mail_from, msg.rcpt_to ); if let Some(v) = &self.cfg.notifications_handler { @@ -602,8 +603,8 @@ impl Session { } info!( - "{self}: {}: {} -> {:?}: message ({message_size} bytes) queued with id {id}", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{self}: {} -> {:?}: message ({message_size} bytes) queued with id {id}", + msg.mail_from, msg.rcpt_to ); self.reply_with("250", "2.0.0", |buf| { write!(buf, "Message ({message_size} bytes) queued with id {id}") diff --git a/ic-bn-lib/src/smtp/inbound/tests.rs b/ic-bn-lib/src/smtp/inbound/tests.rs index ec5b1c7..6e3983f 100644 --- a/ic-bn-lib/src/smtp/inbound/tests.rs +++ b/ic-bn-lib/src/smtp/inbound/tests.rs @@ -1,7 +1,6 @@ use std::{net::SocketAddr, str::FromStr, sync::Mutex}; use async_trait::async_trait; -use fqdn::fqdn; use ic_bn_lib_common::types::http::ListenerOpts; use mail_parser::{Addr, Address, MessageParser}; use mail_send::{SmtpClientBuilder, mail_builder::MessageBuilder}; @@ -28,7 +27,11 @@ pub struct TestDeliveryAgent(Mutex>, Option) #[async_trait] impl DeliversMail for TestDeliveryAgent { - async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError> { + async fn deliver_mail( + &self, + _meta: SessionMeta, + message: EmailMessage, + ) -> Result<(), DeliveryError> { if let Some(e) = &self.1 { return Err(e.clone()); } @@ -268,9 +271,6 @@ async fn test_bdat() { agent.0.lock().unwrap().clone(), Some(EmailMessage { id: Uuid::nil(), - session_id: Uuid::nil(), - remote_ip, - ehlo_hostname: fqdn!("foo.bar"), mail_from: "foo@bar".try_into().unwrap(), rcpt_to: vec!["bar@baz".try_into().unwrap()], body: "012345678998765432100123456789".into(), @@ -312,9 +312,6 @@ async fn test_data() { agent.0.lock().unwrap().clone(), Some(EmailMessage { id: Uuid::nil(), - session_id: Uuid::nil(), - remote_ip, - ehlo_hostname: fqdn!("foo.bar"), mail_from: email!("foo@bar"), rcpt_to: vec![email!("bar@baz")], body: "foobarmessage".into(), @@ -359,9 +356,6 @@ async fn test_expand() { agent.0.lock().unwrap().clone(), Some(EmailMessage { id: Uuid::nil(), - session_id: Uuid::nil(), - remote_ip, - ehlo_hostname: fqdn!("foo.bar"), mail_from: email!("foo@bar"), rcpt_to: vec![email!("dead@beef"), email!("dead@dead"), email!("bar@bax"),], body: "foobarmessage".into(), @@ -656,7 +650,6 @@ async fn test_with_smtp_client() { // Make sure the agent gets the correct mail let msg = agent.0.lock().unwrap().clone().unwrap(); assert_eq!(msg.id, Uuid::nil()); - assert_eq!(msg.ehlo_hostname, "foo.bar"); assert_eq!(msg.mail_from, "john@doe.com"); assert_eq!(msg.rcpt_to, vec!["jane@doe.com"]); @@ -680,7 +673,6 @@ async fn test_with_smtp_client() { let (meta, msg, error) = notification_handler.msg.lock().unwrap().clone().unwrap(); assert_eq!(meta.id, Uuid::nil()); assert_eq!(meta.message_id, Uuid::nil()); - assert_eq!(msg.ehlo_hostname, "foo.bar"); assert_eq!(msg.mail_from, "john@doe.com"); assert_eq!(msg.rcpt_to, vec!["jane@doe.com"]); assert!(error.is_none()); diff --git a/ic-bn-lib/src/smtp/mod.rs b/ic-bn-lib/src/smtp/mod.rs index 27578ed..bf129c1 100644 --- a/ic-bn-lib/src/smtp/mod.rs +++ b/ic-bn-lib/src/smtp/mod.rs @@ -1,11 +1,7 @@ -use std::{ - fmt::{Debug, Display}, - net::IpAddr, -}; +use std::fmt::{Debug, Display}; use async_trait::async_trait; use bytes::Bytes; -use fqdn::FQDN; use itertools::Itertools; use strum::{Display, IntoStaticStr}; use tracing::warn; @@ -95,9 +91,6 @@ pub enum ProtocolError { #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct EmailMessage { pub id: Uuid, - pub session_id: Uuid, - pub remote_ip: IpAddr, - pub ehlo_hostname: FQDN, pub mail_from: EmailAddress, pub rcpt_to: Vec, pub body: Bytes, @@ -107,11 +100,8 @@ impl Display for EmailMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "id: {}, session_id: {}, remote_ip: {}, ehlo: {}, from: {}, to: {}, msg: {}", + "id: {}, from: {}, to: {}, msg: {}", self.id, - self.session_id, - self.remote_ip, - self.ehlo_hostname, self.mail_from, self.rcpt_to.iter().map(|x| x.to_string()).join(", "), String::from_utf8_lossy(&self.body) @@ -150,7 +140,11 @@ pub trait ReceivesNotifications: Send + Sync + Debug { /// Delivers the E-Mail message #[async_trait] pub trait DeliversMail: Send + Sync + Debug { - async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError>; + async fn deliver_mail( + &self, + meta: SessionMeta, + message: EmailMessage, + ) -> Result<(), DeliveryError>; } #[derive(Debug)] @@ -173,7 +167,11 @@ pub struct DummyDeliveryAgent; #[async_trait] impl DeliversMail for DummyDeliveryAgent { - async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError> { + async fn deliver_mail( + &self, + _meta: SessionMeta, + message: EmailMessage, + ) -> Result<(), DeliveryError> { warn!("DummyDeliveryAgent: {message}"); Ok(()) } From e1fbdf98a0b29a94fd0e906904fd386bbb52cc75 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Mon, 1 Jun 2026 12:48:28 +0000 Subject: [PATCH 8/9] Make notifications async, fix build, address comments --- ic-bn-lib/src/lib.rs | 1 + ic-bn-lib/src/smtp/inbound/ehlo.rs | 12 ++--- ic-bn-lib/src/smtp/inbound/mail_from.rs | 33 +++++-------- ic-bn-lib/src/smtp/inbound/manager.rs | 30 +++++++----- ic-bn-lib/src/smtp/inbound/mod.rs | 20 ++++++-- ic-bn-lib/src/smtp/inbound/rcpt_to.rs | 12 ++--- ic-bn-lib/src/smtp/inbound/session.rs | 64 ++++++++++--------------- ic-bn-lib/src/smtp/inbound/tests.rs | 21 ++++++-- ic-bn-lib/src/smtp/mod.rs | 6 +-- 9 files changed, 99 insertions(+), 100 deletions(-) diff --git a/ic-bn-lib/src/lib.rs b/ic-bn-lib/src/lib.rs index b4dc36c..b854cbe 100644 --- a/ic-bn-lib/src/lib.rs +++ b/ic-bn-lib/src/lib.rs @@ -37,6 +37,7 @@ pub use ic_bn_lib_common; #[cfg(feature = "smtp")] pub use mail_auth; pub use prometheus; +#[cfg(feature = "acme")] pub use rcgen; pub use reqwest; pub use rustls; diff --git a/ic-bn-lib/src/smtp/inbound/ehlo.rs b/ic-bn-lib/src/smtp/inbound/ehlo.rs index 9b4f1e8..e2eba79 100644 --- a/ic-bn-lib/src/smtp/inbound/ehlo.rs +++ b/ic-bn-lib/src/smtp/inbound/ehlo.rs @@ -20,8 +20,7 @@ impl Session { info!("{self}: {host}: Invalid EHLO hostname"); self.set_error(ProtocolError::InvalidEhloHostname(format!( "{host}: incorrect hostname" - ))) - .await; + ))); return self.reply("550", "5.5.0", "Invalid EHLO hostname.").await; }; @@ -37,8 +36,7 @@ impl Session { info!("{self}: {host}: EHLO is not FQDN"); self.set_error(ProtocolError::InvalidEhloHostname(format!( "{host}: not FQDN" - ))) - .await; + ))); return self .reply("550", "5.5.0", "EHLO hostname must be an FQDN.") .await; @@ -55,8 +53,7 @@ impl Session { info!("{self}: {host}: EHLO not found in DNS"); self.set_error(ProtocolError::InvalidEhloHostname(format!( "{host}: not found in DNS" - ))) - .await; + ))); return self .reply("550", "5.5.0", "EHLO hostname not found in DNS.") .await; @@ -69,8 +66,7 @@ impl Session { if is_error_negative_lookup(&e) { self.set_error(ProtocolError::InvalidEhloHostname(format!( "{host}: not found in DNS: {e:#}" - ))) - .await; + ))); return self .reply("550", "5.5.0", "EHLO hostname not found in DNS.") diff --git a/ic-bn-lib/src/smtp/inbound/mail_from.rs b/ic-bn-lib/src/smtp/inbound/mail_from.rs index 780f104..f9b9863 100644 --- a/ic-bn-lib/src/smtp/inbound/mail_from.rs +++ b/ic-bn-lib/src/smtp/inbound/mail_from.rs @@ -25,8 +25,7 @@ impl Session { let Some(helo_hostname) = self.data.ehlo_hostname.as_ref().map(|x| x.to_string()) else { self.set_error(ProtocolError::InvalidSequenceOfCommands( "MAIL FROM before EHLO".into(), - )) - .await; + )); return self .reply("503", "5.5.1", "Polite people say EHLO first.") .await; @@ -35,8 +34,7 @@ impl Session { if self.data.mail_from.is_some() { self.set_error(ProtocolError::InvalidSequenceOfCommands( "Multiple MAIL FROM".into(), - )) - .await; + )); return self .reply( "503", @@ -68,8 +66,7 @@ impl Session { self.set_error(ProtocolError::MessageTooBig(format!( "MAIL FROM-specified size is too big: {} > {}", from.size, self.cfg.max_message_size - ))) - .await; + ))); return self.message_too_big().await; } @@ -87,8 +84,7 @@ impl Session { self.set_error(ProtocolError::SenderValidationFailed(format!( "Incorrect sender address: {}", from.address - ))) - .await; + ))); return self .reply("550", "5.7.1", "Sender address is incorrect.") .await; @@ -110,8 +106,7 @@ impl Session { self.set_error(ProtocolError::SenderValidationFailed(format!( "Sender domain is not FQDN: {}", address.domain() - ))) - .await; + ))); return self.reply("550", "5.7.2", "Sender must be an FQDN.").await; }; @@ -129,8 +124,7 @@ impl Session { ); self.set_error(ProtocolError::SenderValidationFailed( "No MX records found".into(), - )) - .await; + )); return self .reply( "550", @@ -147,8 +141,7 @@ impl Session { ); self.set_error(ProtocolError::SenderValidationFailed( "No MX records found".into(), - )) - .await; + )); return self .reply( "550", @@ -162,8 +155,7 @@ impl Session { ); self.set_error(ProtocolError::SenderValidationFailed(format!( "Sender domain verification temporary error: {e:#}", - ))) - .await; + ))); return self .reply("451", "4.7.25", "Temporary error validating sender domain.") .await; @@ -196,8 +188,7 @@ impl Session { self.set_error(ProtocolError::SpfValidationFailed(format!( "SPF validation temporary error: {:?}", output.explanation() - ))) - .await; + ))); return self .reply("451", "4.7.24", "Temporary SPF validation error.") .await; @@ -210,8 +201,7 @@ impl Session { self.set_error(ProtocolError::SpfValidationFailed(format!( "SPF validation permanent error: {:?}", output.explanation() - ))) - .await; + ))); return self .reply_with("550", "5.7.23", |buf| { write!(buf, "SPF validation failed")?; @@ -235,8 +225,7 @@ impl Session { /// Replies about failed reverse IP verification async fn verify_reverse_ip_reply(&mut self, permanent: bool, msg: &str) -> SessionResult { - self.set_error(ProtocolError::ReverseIpValidationFailed(msg.into())) - .await; + self.set_error(ProtocolError::ReverseIpValidationFailed(msg.into())); // Emit permanent errors only if in strict mode if permanent && self.cfg.verify_reverse_ip_strict { diff --git a/ic-bn-lib/src/smtp/inbound/manager.rs b/ic-bn-lib/src/smtp/inbound/manager.rs index 80e3466..f93e247 100644 --- a/ic-bn-lib/src/smtp/inbound/manager.rs +++ b/ic-bn-lib/src/smtp/inbound/manager.rs @@ -33,6 +33,7 @@ impl SessionManager { match session.handle(shutdown_token.child_token()).await { Ok(v) => match v { SessionUpgrade::No => { + Self::notify(&session, None).await; session.stream.shutdown().await.ok(); } SessionUpgrade::StartTls => { @@ -49,7 +50,7 @@ impl SessionManager { debug!("{session}: error closing connection: {e:#}"); }; - Self::notify(session, e).await; + Self::notify(&session, Some(e)).await; } } } @@ -75,7 +76,7 @@ impl SessionManager { debug!("{session}: error closing connection: {e:#}"); }; - Self::notify(session, e).await; + Self::notify(&session, Some(e)).await; } } @@ -85,9 +86,12 @@ impl SessionManager { }; } - async fn notify(session: Session, error: SessionError) { - if let Some(v) = &session.cfg.notifications_handler { - v.notify_session_finish(session.meta(), error).await; + async fn notify(session: &Session, error: Option) { + if let Some(v) = session.cfg.notifications_handler.clone() { + let meta = session.meta(); + tokio::spawn(async move { + v.notify_session_finish(meta, error).await; + }); } } } @@ -111,15 +115,17 @@ impl Session { let error_str = e.to_string(); // Session is partially consumed by `tls_handshake`, so we can't use `Manager::notify()` - if let Some(v) = &self.cfg.notifications_handler { - v.notify_session_finish( - meta, - SessionError::TlsHandshakeFailed(error_str.clone()), - ) - .await; + if let Some(v) = self.cfg.notifications_handler.clone() { + tokio::spawn(async move { + v.notify_session_finish( + meta, + Some(SessionError::TlsHandshakeFailed(error_str.clone())), + ) + .await; + }); } - return Err(SessionError::TlsHandshakeFailed(error_str)); + return Err(SessionError::TlsHandshakeFailed(e.to_string())); } }; diff --git a/ic-bn-lib/src/smtp/inbound/mod.rs b/ic-bn-lib/src/smtp/inbound/mod.rs index 531b029..9439efb 100644 --- a/ic-bn-lib/src/smtp/inbound/mod.rs +++ b/ic-bn-lib/src/smtp/inbound/mod.rs @@ -31,8 +31,8 @@ use uuid::Uuid; use crate::{ network::AsyncReadWrite, smtp::{ - DeliversMail, DummyDeliveryAgent, DummyRecipientResolver, ProtocolError, - ReceivesNotifications, ResolvesRecipient, address::EmailAddress, + DeliversMail, DummyDeliveryAgent, DummyRecipientResolver, EmailMessage, MessageError, + ProtocolError, ReceivesNotifications, ResolvesRecipient, address::EmailAddress, }, }; @@ -328,15 +328,25 @@ impl Session { } } - async fn set_error(&mut self, error: ProtocolError) { + fn set_error(&mut self, error: ProtocolError) { self.data.last_error = Some(error.clone()); self.counters.errors += 1; - if let Some(v) = &self.cfg.notifications_handler { - v.notify_protocol_error(self.meta(), error).await; + if let Some(v) = self.cfg.notifications_handler.clone() { + let meta = self.meta(); + tokio::spawn(async move { v.notify_protocol_error(meta, error).await }); } } + fn notify_message(&self, msg: EmailMessage, error: Option) { + if let Some(v) = self.cfg.notifications_handler.clone() { + let meta = self.meta(); + tokio::spawn(async move { + v.notify_message(meta, msg, error).await; + }); + }; + } + fn meta(&self) -> SessionMeta { SessionMeta { id: self.id, diff --git a/ic-bn-lib/src/smtp/inbound/rcpt_to.rs b/ic-bn-lib/src/smtp/inbound/rcpt_to.rs index c6ff10d..736d17d 100644 --- a/ic-bn-lib/src/smtp/inbound/rcpt_to.rs +++ b/ic-bn-lib/src/smtp/inbound/rcpt_to.rs @@ -21,8 +21,7 @@ impl Session { let Some(mail_from) = &self.data.mail_from else { self.set_error(ProtocolError::InvalidSequenceOfCommands( "RCPT TO before MAIL FROM".into(), - )) - .await; + )); return self .reply("503", "5.5.1", "MAIL FROM is required first.") .await; @@ -42,8 +41,7 @@ impl Session { self.set_error(ProtocolError::RecipientValidationFailed(format!( "Incorrect address: {}", to.address - ))) - .await; + ))); return self.reply("550", "5.1.2", "Incorrect address.").await; }; @@ -57,8 +55,7 @@ impl Session { "Too many recipients: {} > {}", self.data.rcpt_to.len(), self.cfg.max_recipients - ))) - .await; + ))); return self.reply("455", "4.5.3", "Too many recipients.").await; } @@ -89,8 +86,7 @@ impl Session { info!("{self}: {}: recipient resolution error: {e:#}", to.address); self.set_error(ProtocolError::RecipientValidationFailed(format!( "Recipient resolution failed: {e:#}", - ))) - .await; + ))); return match e { RecipientResolveError::UnknownDomain => { diff --git a/ic-bn-lib/src/smtp/inbound/session.rs b/ic-bn-lib/src/smtp/inbound/session.rs index a0fd9f9..e2c1a40 100644 --- a/ic-bn-lib/src/smtp/inbound/session.rs +++ b/ic-bn-lib/src/smtp/inbound/session.rs @@ -94,8 +94,7 @@ impl Session { "{} > {}", self.data.message.len(), max_size - ))) - .await; + ))); self.reply_with("552", "5.3.4", |buf| { write!(buf, "Message too big, we accept up to {max_size} bytes.",) }) @@ -131,8 +130,7 @@ impl Session { } async fn handle_error(&mut self, error: SmtpError) -> SessionResult<()> { - self.set_error(ProtocolError::SmtpError(error.to_string())) - .await; + self.set_error(ProtocolError::SmtpError(error.to_string())); let (code, ext, msg) = match error { SmtpError::UnknownCommand | SmtpError::InvalidResponse { .. } => { @@ -203,8 +201,7 @@ impl Session { self.reply("250", "2.0.0", "OK").await?; } _ => { - self.set_error(ProtocolError::SmtpError("Command not implemented".into())) - .await; + self.set_error(ProtocolError::SmtpError("Command not implemented".into())); self.reply("502", "5.5.1", "Command not implemented.") .await?; } @@ -294,14 +291,12 @@ impl Session { if self.tls_info.is_some() { self.set_error(ProtocolError::InvalidSequenceOfCommands( "STARTTLS inside STARTTLS".into(), - )) - .await; + )); self.reply("504", "5.7.4", "Already in TLS mode.").await?; } else if !self.cfg.tls_mode.enabled() { self.set_error(ProtocolError::InvalidSequenceOfCommands( "STARTTLS without TLS enabled".into(), - )) - .await; + )); self.reply("502", "5.7.0", "TLS not available.").await?; } else { self.reply("220", "2.0.0", "Ready to start TLS.").await?; @@ -361,8 +356,7 @@ impl Session { SessionState::RequestTooLarge(rx) => { // If line-feed found - issue error, otherwise keep ingesting if rx.ingest(&mut iter) { - self.set_error(ProtocolError::SmtpError("Line is too long".into())) - .await; + self.set_error(ProtocolError::SmtpError("Line is too long".into())); self.reply("554", "5.3.4", "Line is too long.").await?; state = SessionState::default(); } else { @@ -513,14 +507,16 @@ impl Session { // Strict: Check that *all* signatures have passed validation (or there are none) // else: Check if *any* of the signatures have passed validation (or there are none) - if (strict && outputs.len() != signatures_passed) - || (!strict && !outputs.is_empty() && signatures_passed == 0) - { - if outputs.len() != signatures_passed { - let error = first_error.unwrap_or_default(); - reply(&error).await?; - return Ok(Some(MessageError::DkimValidationFailed(error))); - } + let is_valid = if strict { + signatures_passed == outputs.len() + } else { + signatures_passed != 0 || outputs.is_empty() + }; + + if !is_valid { + let error = first_error.unwrap_or_default(); + reply(&error).await?; + return Ok(Some(MessageError::DkimValidationFailed(error))); } if signatures_passed > 0 { @@ -530,6 +526,11 @@ impl Session { msg.rcpt_to, outputs.len() ); + } else { + debug!( + "{log_name}: {} -> {:?}: No DKIM signatures found", + msg.mail_from, msg.rcpt_to, + ); } } @@ -551,10 +552,7 @@ impl Session { // Run configured verification steps on the message body if let Some(e) = self.verify_message(&msg).await? { - if let Some(v) = &self.cfg.notifications_handler { - v.notify_message(self.meta(), msg.clone(), Some(e)).await; - } - + self.notify_message(msg.clone(), Some(e)); self.reset_message(); return Ok(()); } @@ -572,16 +570,9 @@ impl Session { msg.mail_from, msg.rcpt_to ); - if let Some(v) = &self.cfg.notifications_handler { - v.notify_message( - self.meta(), - msg.clone(), - Some(MessageError::DeliveryFailed(e.clone())), - ) - .await; - } - + self.notify_message(msg.clone(), Some(MessageError::DeliveryFailed(e.clone()))); self.reset_message(); + return match e { DeliveryError::Permanent(v) => { self.reply_with("550", "5.5.0", |buf| { @@ -598,9 +589,7 @@ impl Session { }; } - if let Some(v) = &self.cfg.notifications_handler { - v.notify_message(self.meta(), msg.clone(), None).await; - } + self.notify_message(msg.clone(), None); info!( "{self}: {} -> {:?}: message ({message_size} bytes) queued with id {id}", @@ -628,8 +617,7 @@ impl Session { } else if self.data.rcpt_to.is_empty() { self.set_error(ProtocolError::InvalidSequenceOfCommands( "DATA before RCPT TO".into(), - )) - .await; + )); self.reply("503", "5.5.1", "RCPT TO is required first.") .await?; return Ok(false); diff --git a/ic-bn-lib/src/smtp/inbound/tests.rs b/ic-bn-lib/src/smtp/inbound/tests.rs index 6e3983f..525e871 100644 --- a/ic-bn-lib/src/smtp/inbound/tests.rs +++ b/ic-bn-lib/src/smtp/inbound/tests.rs @@ -45,7 +45,7 @@ impl DeliversMail for TestDeliveryAgent { #[derive(Debug, Default)] pub struct TestNotificationsReceiver { msg: Mutex)>>, - sess: Mutex>, + sess: Mutex)>>, proto_error: Mutex>, } @@ -64,7 +64,7 @@ impl ReceivesNotifications for TestNotificationsReceiver { *self.proto_error.lock().unwrap() = Some((meta, error)); } - async fn notify_session_finish(&self, meta: SessionMeta, error: SessionError) { + async fn notify_session_finish(&self, meta: SessionMeta, error: Option) { *self.sess.lock().unwrap() = Some((meta, error)); } } @@ -645,6 +645,8 @@ async fn test_with_smtp_client() { ); client.send(message).await.unwrap(); + // Send some bad command to emit a protocol error notification + client.cmd(b"FOOBAR\r\n").await.ok(); client.quit().await.unwrap(); // Make sure the agent gets the correct mail @@ -680,9 +682,20 @@ async fn test_with_smtp_client() { let (meta, error) = notification_handler.sess.lock().unwrap().take().unwrap(); assert_eq!(meta.id, Uuid::nil()); assert_eq!(meta.remote_ip, IpAddr::from_str("127.0.0.1").unwrap()); - assert!(meta.last_error.is_none()); + assert!(matches!(meta.last_error, Some(ProtocolError::SmtpError(_)))); assert_eq!(meta.tls_info.unwrap().protocol, ProtocolVersion::TLSv1_3); - assert!(matches!(error, SessionError::Quit)); + assert!(matches!(error, Some(SessionError::Quit))); + + // Error from the FOOBAR command + let (meta, error) = notification_handler + .proto_error + .lock() + .unwrap() + .take() + .unwrap(); + assert_eq!(meta.id, Uuid::nil()); + assert_eq!(meta.remote_ip, IpAddr::from_str("127.0.0.1").unwrap()); + assert!(matches!(error, ProtocolError::SmtpError(_))); } #[test] diff --git a/ic-bn-lib/src/smtp/mod.rs b/ic-bn-lib/src/smtp/mod.rs index bf129c1..3bac6a0 100644 --- a/ic-bn-lib/src/smtp/mod.rs +++ b/ic-bn-lib/src/smtp/mod.rs @@ -51,7 +51,7 @@ pub enum DeliveryError { Permanent(String), } -/// Error that might happen during message vaildation or delivery +/// Error that might happen during message validation or delivery #[derive(thiserror::Error, Clone, Debug, IntoStaticStr)] #[strum(serialize_all = "snake_case")] pub enum MessageError { @@ -124,7 +124,7 @@ pub trait ResolvesRecipient: Send + Sync + Debug { /// Notifies about events #[async_trait] pub trait ReceivesNotifications: Send + Sync + Debug { - /// Notify when the message is queued + /// Notify when the message is queued or the validation failed async fn notify_message( &self, meta: SessionMeta, @@ -134,7 +134,7 @@ pub trait ReceivesNotifications: Send + Sync + Debug { /// Notify when the protocol error happens async fn notify_protocol_error(&self, meta: SessionMeta, error: ProtocolError); /// Notify when the session is finished - async fn notify_session_finish(&self, meta: SessionMeta, error: SessionError); + async fn notify_session_finish(&self, meta: SessionMeta, error: Option); } /// Delivers the E-Mail message From 4178de2aeb8e612d773cbe45477da28b5b61adeb Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Mon, 1 Jun 2026 13:17:08 +0000 Subject: [PATCH 9/9] use Arc for a bit more efficiency --- ic-bn-lib/src/smtp/ic/delivery_agent.rs | 12 ++++++------ ic-bn-lib/src/smtp/inbound/mod.rs | 2 +- ic-bn-lib/src/smtp/inbound/session.rs | 5 +++-- ic-bn-lib/src/smtp/inbound/tests.rs | 26 ++++++++++++------------- ic-bn-lib/src/smtp/mod.rs | 11 +++++++---- 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/ic-bn-lib/src/smtp/ic/delivery_agent.rs b/ic-bn-lib/src/smtp/ic/delivery_agent.rs index fd90659..cdc7aa8 100644 --- a/ic-bn-lib/src/smtp/ic/delivery_agent.rs +++ b/ic-bn-lib/src/smtp/ic/delivery_agent.rs @@ -246,7 +246,7 @@ impl DeliversMail for IcSmtpDeliveryAgent { async fn deliver_mail( &self, meta: SessionMeta, - message: EmailMessage, + message: Arc, ) -> Result<(), DeliveryError> { info!( "{self}: delivering mail, ehlo: {:?}, from: '{}', to: '{:?}', id '{}'", @@ -261,17 +261,17 @@ impl DeliversMail for IcSmtpDeliveryAgent { // The future in this loop usually resolves instantly due to the nature of the SMTP protocol. // Before the mail is delivered it goes through an RCPT TO sequence which populates the cache. // So making it concurrent isn't worth it probably currently. - for rcpt in message.rcpt_to { + for rcpt in &message.rcpt_to { // Figure out which canister we should talk to let canister_id = self - .resolve_canister_id(&rcpt) + .resolve_canister_id(rcpt) .await .ok_or_else(|| DeliveryError::Permanent("Unknown domain".into()))?; if let Some(v) = mapping.get_mut(&canister_id) { - v.push(rcpt); + v.push(rcpt.clone()); } else { - mapping.insert(canister_id, vec![rcpt]); + mapping.insert(canister_id, vec![rcpt.clone()]); } } @@ -641,7 +641,7 @@ mod tests { rcpt_to: vec![], }; delivery_agent - .deliver_mail(meta, message.clone()) + .deliver_mail(meta, Arc::new(message.clone())) .await .unwrap(); diff --git a/ic-bn-lib/src/smtp/inbound/mod.rs b/ic-bn-lib/src/smtp/inbound/mod.rs index 9439efb..8ceca19 100644 --- a/ic-bn-lib/src/smtp/inbound/mod.rs +++ b/ic-bn-lib/src/smtp/inbound/mod.rs @@ -338,7 +338,7 @@ impl Session { } } - fn notify_message(&self, msg: EmailMessage, error: Option) { + fn notify_message(&self, msg: Arc, error: Option) { if let Some(v) = self.cfg.notifications_handler.clone() { let meta = self.meta(); tokio::spawn(async move { diff --git a/ic-bn-lib/src/smtp/inbound/session.rs b/ic-bn-lib/src/smtp/inbound/session.rs index e2c1a40..7e79686 100644 --- a/ic-bn-lib/src/smtp/inbound/session.rs +++ b/ic-bn-lib/src/smtp/inbound/session.rs @@ -2,6 +2,7 @@ use std::{ borrow::Cow, fmt::{self, Write as _}, io::Write as _, + sync::Arc, time::{Duration, Instant}, }; @@ -543,12 +544,12 @@ impl Session { // SAFETY: Code makes sure these are all Some(). // It's better to panic in tests if they are not. - let msg = EmailMessage { + let msg = Arc::new(EmailMessage { id, mail_from: self.data.mail_from.take().unwrap(), rcpt_to: std::mem::take(&mut self.data.rcpt_to), body: std::mem::take(&mut self.data.message).into(), - }; + }); // Run configured verification steps on the message body if let Some(e) = self.verify_message(&msg).await? { diff --git a/ic-bn-lib/src/smtp/inbound/tests.rs b/ic-bn-lib/src/smtp/inbound/tests.rs index 525e871..f2b89d2 100644 --- a/ic-bn-lib/src/smtp/inbound/tests.rs +++ b/ic-bn-lib/src/smtp/inbound/tests.rs @@ -23,14 +23,14 @@ use crate::{ use super::*; #[derive(Debug, Default)] -pub struct TestDeliveryAgent(Mutex>, Option); +pub struct TestDeliveryAgent(Mutex>>, Option); #[async_trait] impl DeliversMail for TestDeliveryAgent { async fn deliver_mail( &self, _meta: SessionMeta, - message: EmailMessage, + message: Arc, ) -> Result<(), DeliveryError> { if let Some(e) = &self.1 { return Err(e.clone()); @@ -44,7 +44,7 @@ impl DeliversMail for TestDeliveryAgent { #[allow(clippy::type_complexity)] #[derive(Debug, Default)] pub struct TestNotificationsReceiver { - msg: Mutex)>>, + msg: Mutex, Option)>>, sess: Mutex)>>, proto_error: Mutex>, } @@ -54,7 +54,7 @@ impl ReceivesNotifications for TestNotificationsReceiver { async fn notify_message( &self, meta: SessionMeta, - message: EmailMessage, + message: Arc, error: Option, ) { *self.msg.lock().unwrap() = Some((meta, message, error)); @@ -268,13 +268,13 @@ async fn test_bdat() { // Make sure the agent gets the correct mail assert_eq!( - agent.0.lock().unwrap().clone(), - Some(EmailMessage { + agent.0.lock().unwrap().clone().unwrap().as_ref(), + &EmailMessage { id: Uuid::nil(), mail_from: "foo@bar".try_into().unwrap(), rcpt_to: vec!["bar@baz".try_into().unwrap()], body: "012345678998765432100123456789".into(), - }) + } ); } @@ -309,13 +309,13 @@ async fn test_data() { // Make sure the agent gets the correct mail assert_eq!( - agent.0.lock().unwrap().clone(), - Some(EmailMessage { + agent.0.lock().unwrap().clone().unwrap().as_ref(), + &EmailMessage { id: Uuid::nil(), mail_from: email!("foo@bar"), rcpt_to: vec![email!("bar@baz")], body: "foobarmessage".into(), - }) + } ) } @@ -353,13 +353,13 @@ async fn test_expand() { // Make sure the agent gets the correct mail assert_eq!( - agent.0.lock().unwrap().clone(), - Some(EmailMessage { + agent.0.lock().unwrap().clone().unwrap().as_ref(), + &EmailMessage { id: Uuid::nil(), mail_from: email!("foo@bar"), rcpt_to: vec![email!("dead@beef"), email!("dead@dead"), email!("bar@bax"),], body: "foobarmessage".into(), - }) + } ) } diff --git a/ic-bn-lib/src/smtp/mod.rs b/ic-bn-lib/src/smtp/mod.rs index 3bac6a0..ca5ef19 100644 --- a/ic-bn-lib/src/smtp/mod.rs +++ b/ic-bn-lib/src/smtp/mod.rs @@ -1,4 +1,7 @@ -use std::fmt::{Debug, Display}; +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; use async_trait::async_trait; use bytes::Bytes; @@ -128,7 +131,7 @@ pub trait ReceivesNotifications: Send + Sync + Debug { async fn notify_message( &self, meta: SessionMeta, - message: EmailMessage, + message: Arc, error: Option, ); /// Notify when the protocol error happens @@ -143,7 +146,7 @@ pub trait DeliversMail: Send + Sync + Debug { async fn deliver_mail( &self, meta: SessionMeta, - message: EmailMessage, + message: Arc, ) -> Result<(), DeliveryError>; } @@ -170,7 +173,7 @@ impl DeliversMail for DummyDeliveryAgent { async fn deliver_mail( &self, _meta: SessionMeta, - message: EmailMessage, + message: Arc, ) -> Result<(), DeliveryError> { warn!("DummyDeliveryAgent: {message}"); Ok(())