diff --git a/ic-bn-lib/src/lib.rs b/ic-bn-lib/src/lib.rs index 44235ef..b854cbe 100644 --- a/ic-bn-lib/src/lib.rs +++ b/ic-bn-lib/src/lib.rs @@ -37,6 +37,8 @@ pub use ic_bn_lib_common; #[cfg(feature = "smtp")] pub use mail_auth; pub use prometheus; +#[cfg(feature = "acme")] +pub use rcgen; pub use reqwest; pub use rustls; #[cfg(feature = "acme-alpn")] diff --git a/ic-bn-lib/src/smtp/ic/delivery_agent.rs b/ic-bn-lib/src/smtp/ic/delivery_agent.rs index 1c72251..cdc7aa8 100644 --- a/ic-bn-lib/src/smtp/ic/delivery_agent.rs +++ b/ic-bn-lib/src/smtp/ic/delivery_agent.rs @@ -22,6 +22,7 @@ use crate::{ candid::{Envelope, SmtpRequest, SmtpResponse}, parse_email, }, + inbound::SessionMeta, }, }; @@ -242,10 +243,14 @@ impl IcSmtpDeliveryAgent { #[async_trait] impl DeliversMail for IcSmtpDeliveryAgent { - async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError> { + async fn deliver_mail( + &self, + meta: SessionMeta, + message: Arc, + ) -> Result<(), DeliveryError> { info!( - "{self}: delivering mail, ehlo: '{}', from: '{}', to: '{:?}', id '{}'", - message.ehlo_hostname, message.mail_from, message.rcpt_to, message.id + "{self}: delivering mail, ehlo: {:?}, from: '{}', to: '{:?}', id '{}'", + meta.ehlo_hostname, message.mail_from, message.rcpt_to, message.id ); // A single message can be (potentially) destined for several canisters/domains. @@ -256,17 +261,17 @@ impl DeliversMail for IcSmtpDeliveryAgent { // The future in this loop usually resolves instantly due to the nature of the SMTP protocol. // Before the mail is delivered it goes through an RCPT TO sequence which populates the cache. // So making it concurrent isn't worth it probably currently. - for rcpt in message.rcpt_to { + for rcpt in &message.rcpt_to { // Figure out which canister we should talk to let canister_id = self - .resolve_canister_id(&rcpt) + .resolve_canister_id(rcpt) .await .ok_or_else(|| DeliveryError::Permanent("Unknown domain".into()))?; if let Some(v) = mapping.get_mut(&canister_id) { - v.push(rcpt); + v.push(rcpt.clone()); } else { - mapping.insert(canister_id, vec![rcpt]); + mapping.insert(canister_id, vec![rcpt.clone()]); } } @@ -359,14 +364,20 @@ impl ResolvesRecipient for IcSmtpDeliveryAgent { #[cfg(test)] mod tests { - use std::sync::{ - Mutex, - atomic::{AtomicUsize, Ordering}, + use std::{ + net::IpAddr, + sync::{ + Mutex, + atomic::{AtomicUsize, Ordering}, + }, }; use crate::{ email, - smtp::ic::candid::{Header, Message, SmtpOk, SmtpRequestError}, + smtp::{ + ic::candid::{Header, Message, SmtpOk, SmtpRequestError}, + inbound::SessionCounters, + }, }; use super::*; @@ -607,7 +618,6 @@ mod tests { let message = EmailMessage { id: Uuid::nil(), - ehlo_hostname: fqdn!("foo.bar"), mail_from: email!("john@doe.com"), rcpt_to: vec![ // these two go to qoctq-giaaa-aaaaa-aaaea-cai as a single mail @@ -619,7 +629,21 @@ mod tests { body: message.as_bytes().into(), }; - delivery_agent.deliver_mail(message.clone()).await.unwrap(); + let meta = SessionMeta { + id: Uuid::nil(), + message_id: Uuid::nil(), + remote_ip: IpAddr::from_str("1.1.1.1").unwrap(), + tls_info: None, + ehlo_hostname: None, + counters: SessionCounters::new(), + last_error: None, + mail_from: None, + rcpt_to: vec![], + }; + delivery_agent + .deliver_mail(meta, Arc::new(message.clone())) + .await + .unwrap(); let body = indoc! {r#" --XXXXboundary text diff --git a/ic-bn-lib/src/smtp/inbound/ehlo.rs b/ic-bn-lib/src/smtp/inbound/ehlo.rs index 70e2a28..e2eba79 100644 --- a/ic-bn-lib/src/smtp/inbound/ehlo.rs +++ b/ic-bn-lib/src/smtp/inbound/ehlo.rs @@ -6,7 +6,10 @@ use tracing::{debug, info}; use crate::{ http::dns::is_error_negative_lookup, network::AsyncReadWrite, - smtp::inbound::{Session, SessionResult}, + smtp::{ + ProtocolError, + inbound::{Session, SessionResult}, + }, }; impl Session { @@ -15,6 +18,9 @@ impl Session { // Validate hostname let Ok(ehlo_hostname) = FQDN::from_str(host) else { info!("{self}: {host}: Invalid EHLO hostname"); + self.set_error(ProtocolError::InvalidEhloHostname(format!( + "{host}: incorrect hostname" + ))); return self.reply("550", "5.5.0", "Invalid EHLO hostname.").await; }; @@ -28,6 +34,9 @@ impl Session { if ehlo_hostname.depth() < 2 { info!("{self}: {host}: EHLO is not FQDN"); + self.set_error(ProtocolError::InvalidEhloHostname(format!( + "{host}: not FQDN" + ))); return self .reply("550", "5.5.0", "EHLO hostname must be an FQDN.") .await; @@ -42,6 +51,9 @@ impl Session { } None => { info!("{self}: {host}: EHLO not found in DNS"); + self.set_error(ProtocolError::InvalidEhloHostname(format!( + "{host}: not found in DNS" + ))); return self .reply("550", "5.5.0", "EHLO hostname not found in DNS.") .await; @@ -52,6 +64,10 @@ impl Session { info!("{self}: {host}: EHLO not found in DNS: {e:#}"); if is_error_negative_lookup(&e) { + self.set_error(ProtocolError::InvalidEhloHostname(format!( + "{host}: not found in DNS: {e:#}" + ))); + return self .reply("550", "5.5.0", "EHLO hostname not found in DNS.") .await; diff --git a/ic-bn-lib/src/smtp/inbound/mail_from.rs b/ic-bn-lib/src/smtp/inbound/mail_from.rs index 2a4de91..f9b9863 100644 --- a/ic-bn-lib/src/smtp/inbound/mail_from.rs +++ b/ic-bn-lib/src/smtp/inbound/mail_from.rs @@ -13,6 +13,7 @@ use crate::{ http::dns::is_error_negative_lookup, network::AsyncReadWrite, smtp::{ + ProtocolError, address::EmailAddress, inbound::{Session, SessionResult}, }, @@ -22,12 +23,18 @@ impl Session { /// Handles MAIL FROM command pub async fn handle_mail_from(&mut self, from: MailFrom>) -> SessionResult<()> { let Some(helo_hostname) = self.data.ehlo_hostname.as_ref().map(|x| x.to_string()) else { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "MAIL FROM before EHLO".into(), + )); return self .reply("503", "5.5.1", "Polite people say EHLO first.") .await; }; if self.data.mail_from.is_some() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "Multiple MAIL FROM".into(), + )); return self .reply( "503", @@ -56,6 +63,10 @@ impl Session { } if from.size > self.cfg.max_message_size { + self.set_error(ProtocolError::MessageTooBig(format!( + "MAIL FROM-specified size is too big: {} > {}", + from.size, self.cfg.max_message_size + ))); return self.message_too_big().await; } @@ -70,6 +81,10 @@ impl Session { // Validate address let Ok(address) = EmailAddress::from_str(&from.address) else { info!("{self}: {}: incorrect sender address", from.address); + self.set_error(ProtocolError::SenderValidationFailed(format!( + "Incorrect sender address: {}", + from.address + ))); return self .reply("550", "5.7.1", "Sender address is incorrect.") .await; @@ -88,6 +103,10 @@ impl Session { if self.cfg.verify_sender_domain { if address.domain().depth() < 2 { info!("{self}: {address}: sender domain verification failed: not FQDN"); + self.set_error(ProtocolError::SenderValidationFailed(format!( + "Sender domain is not FQDN: {}", + address.domain() + ))); return self.reply("550", "5.7.2", "Sender must be an FQDN.").await; }; @@ -103,6 +122,9 @@ impl Session { info!( "{self}: {address}: sender domain verification failed: no MX records found" ); + self.set_error(ProtocolError::SenderValidationFailed( + "No MX records found".into(), + )); return self .reply( "550", @@ -117,6 +139,9 @@ impl Session { info!( "{self}: {address}: sender domain verification failed: no MX records found" ); + self.set_error(ProtocolError::SenderValidationFailed( + "No MX records found".into(), + )); return self .reply( "550", @@ -128,6 +153,9 @@ impl Session { info!( "{self}: {address}: sender domain verification failed: temporary error: {e:#}" ); + self.set_error(ProtocolError::SenderValidationFailed(format!( + "Sender domain verification temporary error: {e:#}", + ))); return self .reply("451", "4.7.25", "Temporary error validating sender domain.") .await; @@ -157,7 +185,10 @@ impl Session { "{self}: {address}: SPF validation failed: temporary error: {:?}", output.explanation() ); - + self.set_error(ProtocolError::SpfValidationFailed(format!( + "SPF validation temporary error: {:?}", + output.explanation() + ))); return self .reply("451", "4.7.24", "Temporary SPF validation error.") .await; @@ -167,7 +198,10 @@ impl Session { "{self}: {address}: SPF validation failed: permanent error: {:?}", output.explanation() ); - + self.set_error(ProtocolError::SpfValidationFailed(format!( + "SPF validation permanent error: {:?}", + output.explanation() + ))); return self .reply_with("550", "5.7.23", |buf| { write!(buf, "SPF validation failed")?; @@ -191,6 +225,8 @@ impl Session { /// Replies about failed reverse IP verification async fn verify_reverse_ip_reply(&mut self, permanent: bool, msg: &str) -> SessionResult { + self.set_error(ProtocolError::ReverseIpValidationFailed(msg.into())); + // Emit permanent errors only if in strict mode if permanent && self.cfg.verify_reverse_ip_strict { self.reply_with("550", "5.7.25", |buf| { @@ -309,7 +345,7 @@ impl Session { .await; } - // Otherwise everything succeeded but no matches found + // Otherwise everything succeeded but no matches were found info!( "{self}: reverse IP verification failed: no addresses matching client's IP found after resolving PTR" ); diff --git a/ic-bn-lib/src/smtp/inbound/manager.rs b/ic-bn-lib/src/smtp/inbound/manager.rs index 55124eb..f93e247 100644 --- a/ic-bn-lib/src/smtp/inbound/manager.rs +++ b/ic-bn-lib/src/smtp/inbound/manager.rs @@ -33,11 +33,11 @@ impl SessionManager { match session.handle(shutdown_token.child_token()).await { Ok(v) => match v { SessionUpgrade::No => { + Self::notify(&session, None).await; session.stream.shutdown().await.ok(); } - SessionUpgrade::StartTls => { - Self::starttls(session, shutdown_token.child_token()).await + Self::handle_connection_tls(session, shutdown_token.child_token()).await } }, @@ -49,15 +49,20 @@ impl SessionManager { if let Err(e) = session.shutdown().await { debug!("{session}: error closing connection: {e:#}"); }; + + Self::notify(&session, Some(e)).await; } } } /// Converts session into TLS mode & runs it - async fn starttls(session: Session, shutdown_token: CancellationToken) { + async fn handle_connection_tls( + session: Session, + shutdown_token: CancellationToken, + ) { let session_name = session.to_string(); + debug!("{session}: starting TLS handshake"); - debug!("{session_name}: starting TLS handshake"); match session.into_tls().await { Ok(mut session) => { debug!("{session}: TLS handshake succeeded"); @@ -70,6 +75,8 @@ impl SessionManager { if let Err(e) = session.shutdown().await { debug!("{session}: error closing connection: {e:#}"); }; + + Self::notify(&session, Some(e)).await; } } @@ -78,6 +85,15 @@ impl SessionManager { } }; } + + async fn notify(session: &Session, error: Option) { + if let Some(v) = session.cfg.notifications_handler.clone() { + let meta = session.meta(); + tokio::spawn(async move { + v.notify_session_finish(meta, error).await; + }); + } + } } impl Session { @@ -92,7 +108,26 @@ impl Session { } }; - let (stream, tls_info) = tls_handshake(tls_config, self.stream).await?; + let meta = self.meta(); + let (stream, tls_info) = match tls_handshake(tls_config, self.stream).await { + Ok(v) => v, + Err(e) => { + let error_str = e.to_string(); + + // Session is partially consumed by `tls_handshake`, so we can't use `Manager::notify()` + if let Some(v) = self.cfg.notifications_handler.clone() { + tokio::spawn(async move { + v.notify_session_finish( + meta, + Some(SessionError::TlsHandshakeFailed(error_str.clone())), + ) + .await; + }); + } + + return Err(SessionError::TlsHandshakeFailed(e.to_string())); + } + }; Ok(Session { id: self.id, diff --git a/ic-bn-lib/src/smtp/inbound/mod.rs b/ic-bn-lib/src/smtp/inbound/mod.rs index b681e42..8ceca19 100644 --- a/ic-bn-lib/src/smtp/inbound/mod.rs +++ b/ic-bn-lib/src/smtp/inbound/mod.rs @@ -25,20 +25,24 @@ use smtp_proto::{ BdatReceiver, DataReceiver, DummyDataReceiver, DummyLineReceiver, RequestReceiver, }, }; -use strum::Display; +use strum::{Display, IntoStaticStr}; use uuid::Uuid; use crate::{ network::AsyncReadWrite, smtp::{ - DeliversMail, DummyDeliveryAgent, DummyRecipientResolver, ResolvesRecipient, - address::EmailAddress, + DeliversMail, DummyDeliveryAgent, DummyRecipientResolver, EmailMessage, MessageError, + ProtocolError, ReceivesNotifications, ResolvesRecipient, address::EmailAddress, }, }; pub(crate) const MAX_REPLY_LEN: usize = 256; -#[derive(thiserror::Error, Debug)] +/// Error that leads to session termination. +/// The only "expected" error is `Quit` that is caused by the client QUIT +/// command. +#[derive(thiserror::Error, Debug, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] pub enum SessionError { #[error("I/O error: {0}")] Io(#[from] io::Error), @@ -48,6 +52,8 @@ pub enum SessionError { Dns(#[from] NetError), #[error("Timed out")] Timeout, + #[error("TLS handshake failed: {0}")] + TlsHandshakeFailed(String), #[error("{0}")] SmtpError(#[from] SmtpError), #[error("Session terminated by client (QUIT)")] @@ -125,6 +131,7 @@ pub struct SessionConfig { pub authenticator: Arc, pub recipient_resolver: Arc, pub delivery_agent: Arc, + pub notifications_handler: Option>, } impl SessionConfig { @@ -165,6 +172,7 @@ impl SessionConfig { authenticator: Arc::new(MessageAuthenticator::new_cloudflare().unwrap()), recipient_resolver: Arc::new(DummyRecipientResolver), delivery_agent: Arc::new(DummyDeliveryAgent), + notifications_handler: None, } } @@ -218,8 +226,10 @@ impl Default for SessionState { } /// SMTP dynamic session data -#[derive(Debug, Default)] +#[derive(Clone, Debug)] pub struct SessionData { + message_id: Uuid, + last_error: Option, reverse_ip_verified: bool, ehlo_hostname: Option, mail_from: Option, @@ -227,19 +237,36 @@ pub struct SessionData { message: Vec, } +impl Default for SessionData { + fn default() -> Self { + Self { + #[cfg(not(test))] + message_id: Uuid::now_v7(), + #[cfg(test)] + message_id: Uuid::nil(), + last_error: None, + reverse_ip_verified: false, + ehlo_hostname: None, + mail_from: None, + rcpt_to: vec![], + message: vec![], + } + } +} + /// SMTP session counters -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct SessionCounters { - valid_until: Instant, - bytes_ingested: usize, - messages_queued: usize, - errors: usize, + pub started: Instant, + pub bytes_ingested: usize, + pub messages_queued: usize, + pub errors: usize, } impl SessionCounters { - fn new(ttl: Duration) -> Self { + pub(crate) fn new() -> Self { Self { - valid_until: Instant::now() + ttl, + started: Instant::now(), bytes_ingested: 0, messages_queued: 0, errors: 0, @@ -247,6 +274,20 @@ impl SessionCounters { } } +/// Session metadata for logging/notification purposes +#[derive(Clone, Debug)] +pub struct SessionMeta { + pub id: Uuid, + pub message_id: Uuid, + pub remote_ip: IpAddr, + pub tls_info: Option, + pub counters: SessionCounters, + pub last_error: Option, + pub ehlo_hostname: Option, + pub mail_from: Option, + pub rcpt_to: Vec, +} + /// SMTP Session pub struct Session { id: Uuid, @@ -273,648 +314,53 @@ impl Display for Session { impl Session { pub fn new(remote_ip: IpAddr, stream: S, cfg: Arc) -> Self { Self { + #[cfg(not(test))] id: Uuid::now_v7(), + #[cfg(test)] + id: Uuid::nil(), remote_ip, stream, state: SessionState::Greeting, data: SessionData::default(), - counters: SessionCounters::new(cfg.max_session_duration), + counters: SessionCounters::new(), cfg, tls_info: None, } } -} -#[cfg(test)] -mod tests { - use std::{net::SocketAddr, str::FromStr, sync::Mutex}; - - use async_trait::async_trait; - use fqdn::fqdn; - use ic_bn_lib_common::types::http::ListenerOpts; - use mail_parser::{Addr, Address, MessageParser}; - use mail_send::{SmtpClientBuilder, mail_builder::MessageBuilder}; - use rustls::{ClientConfig, pki_types::ServerName}; - use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; - use tokio_rustls::TlsConnector; - use tokio_util::sync::CancellationToken; - - use crate::{ - email, - network::listener::listen_tcp, - smtp::{ - DeliveryError, EmailMessage, RecipientPolicy, RecipientResolveError, - inbound::manager::SessionManager, server::Server, - }, - tests::{TEST_CERT_1, TEST_KEY_1}, - tls::{resolver::StubResolver, verify::NoopServerCertVerifier}, - }; - - use super::*; - - #[derive(Debug, Default)] - pub struct TestDeliveryAgent(Mutex>, Option); - - #[async_trait] - impl DeliversMail for TestDeliveryAgent { - async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError> { - if let Some(e) = &self.1 { - return Err(e.clone()); - } - - *self.0.lock().unwrap() = Some(message); - Ok(()) - } - } + fn set_error(&mut self, error: ProtocolError) { + self.data.last_error = Some(error.clone()); + self.counters.errors += 1; - #[derive(Debug)] - pub struct TestRecipientResolver( - EmailAddress, - Option, - Option>, - ); - - #[async_trait] - impl ResolvesRecipient for TestRecipientResolver { - async fn resolve_recipient( - &self, - _from: &EmailAddress, - rcpt: &EmailAddress, - ) -> Result { - assert_eq!(rcpt, &self.0); - if let Some(v) = &self.1 { - return Ok(RecipientPolicy::Rewrite(v.clone())); - } - - if let Some(v) = &self.2 { - return Ok(RecipientPolicy::Expand(v.clone())); - } - - Ok(RecipientPolicy::Accept) + if let Some(v) = self.cfg.notifications_handler.clone() { + let meta = self.meta(); + tokio::spawn(async move { v.notify_protocol_error(meta, error).await }); } } - fn create_session( - stream: S, - greeting_delay: Option, - ) -> Session { - let mut cfg = SessionConfig::new("test", 512); - cfg.max_errors = 5; - cfg.greeting_delay = greeting_delay; - cfg.max_messages_per_session = 3; - cfg.max_session_data = 8192; - cfg.max_recipients = 3; - - Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)) - } - - fn create_basic_stream() -> tokio_test::io::Builder { - let mut builder = tokio_test::io::Builder::new(); - - builder.write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(b"HELO foo.bar\r\n") - .write(b"250 test you had me at HELO\r\n") - .read(b"EHLO foo.bar\r\n") - .write(b"250-test you had me at EHLO\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); - - builder - } - - fn stream_send_message(b: &mut tokio_test::io::Builder) { - b.read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"DATA\r\n") - .write(b"354 Start mail input; end with .\r\n") - .read(b"foobarmessage\r\n.\r\n") - .write(b"250 2.0.0 Message (13 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n"); - } - - #[tokio::test] - async fn test_ehlo_required() { - let stream = tokio_test::io::Builder::new() - .write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(b"MAIL FROM:\r\n") - .write(b"503 5.5.1 Polite people say EHLO first.\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_pipelining() { - let mut builder = create_basic_stream(); - let stream = builder - .read(b"MAIL FROM:\r\nRCPT TO:\r\nDATA\r\n") - .write(b"250 2.1.0 OK\r\n250 2.1.5 OK\r\n354 Start mail input; end with .\r\n") - .read(b"foobarmessage\r\n.\r\n") - .write(b"250 2.0.0 Message (13 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_basic_session() { - let mut builder = create_basic_stream(); - builder - .read(b"RCPT TO:\r\n") - .write(b"503 5.5.1 MAIL FROM is required first.\r\n") - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"MAIL FROM:\r\n") - .write(b"503 5.5.1 Multiple MAIL FROM commands are not allowed.\r\n") - .read(b"DATA\r\n") - .write(b"503 5.5.1 RCPT TO is required first.\r\n") - .read(b"RSET\r\n") - .write(b"250 2.0.0 OK\r\n") - .read(b"NOOP\r\n") - .write(b"250 2.0.0 OK\r\n") - .read(b"FOOB\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"HELP\r\n") - .write(b"502 5.5.1 Command not implemented.\r\n"); - - stream_send_message(&mut builder); - let stream = builder - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_client_sends_before_greeting() { - let stream = tokio_test::io::Builder::new() - .read(b"EHLO foo.bar\r\n") - .write(b"501 5.7.1 Client sent command before greeting banner.\r\n") - .build(); - - let mut session = create_session(stream, Some(Duration::from_millis(100))); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::SendsBeforeGreeting - )); - } - - #[tokio::test] - async fn test_bdat() { - let stream = create_basic_stream() - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"BDAT 10\r\n") - .read(b"01234") - .read(b"56789") - .write(b"250 2.6.0 Chunk accepted.\r\n") - .read(b"BDAT 10\r\n") - .read(b"987") - .read(b"654") - .read(b"3210") - .write(b"250 2.6.0 Chunk accepted.\r\n") - .read(b"BDAT 10 LAST\r\n") - .read(b"0123456789") - .write(b"250 2.0.0 Message (30 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let agent = Arc::new(TestDeliveryAgent::default()); - let resolver = TestRecipientResolver( - "dead@beef".try_into().unwrap(), - Some("bar@baz".try_into().unwrap()), - None, - ); - - let mut cfg = SessionConfig::new("test", 512); - cfg.delivery_agent = agent.clone(); - cfg.recipient_resolver = Arc::new(resolver); - - let mut session = Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - - // Make sure the agent gets the correct mail - assert_eq!( - agent.0.lock().unwrap().clone(), - Some(EmailMessage { - id: Uuid::nil(), - ehlo_hostname: fqdn!("foo.bar"), - mail_from: "foo@bar".try_into().unwrap(), - rcpt_to: vec!["bar@baz".try_into().unwrap()], - body: "012345678998765432100123456789".into(), - }) - ); + fn notify_message(&self, msg: Arc, error: Option) { + if let Some(v) = self.cfg.notifications_handler.clone() { + let meta = self.meta(); + tokio::spawn(async move { + v.notify_message(meta, msg, error).await; + }); + }; } - #[tokio::test] - async fn test_data() { - let mut builder = create_basic_stream(); - stream_send_message(&mut builder); - - let stream = builder - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let agent = Arc::new(TestDeliveryAgent::default()); - let resolver = TestRecipientResolver( - "dead@beef".try_into().unwrap(), - Some("bar@baz".try_into().unwrap()), - None, - ); - - let mut cfg = SessionConfig::new("test", 512); - cfg.delivery_agent = agent.clone(); - cfg.recipient_resolver = Arc::new(resolver); - - let mut session = Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - - // Make sure the agent gets the correct mail - assert_eq!( - agent.0.lock().unwrap().clone(), - Some(EmailMessage { - id: Uuid::nil(), - ehlo_hostname: fqdn!("foo.bar"), - mail_from: email!("foo@bar"), - rcpt_to: vec![email!("bar@baz")], - body: "foobarmessage".into(), - }) - ) - } - - #[tokio::test] - async fn test_expand() { - let mut builder = create_basic_stream(); - stream_send_message(&mut builder); - - let stream = builder - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let agent = Arc::new(TestDeliveryAgent::default()); - let resolver = TestRecipientResolver( - "dead@beef".try_into().unwrap(), - None, - Some(vec![ - "dead@dead".try_into().unwrap(), - "bar@bax".try_into().unwrap(), - ]), - ); - - let mut cfg = SessionConfig::new("test", 512); - cfg.delivery_agent = agent.clone(); - cfg.recipient_resolver = Arc::new(resolver); - - let mut session = Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - - // Make sure the agent gets the correct mail - assert_eq!( - agent.0.lock().unwrap().clone(), - Some(EmailMessage { - id: Uuid::nil(), - ehlo_hostname: fqdn!("foo.bar"), - mail_from: email!("foo@bar"), - rcpt_to: vec![email!("dead@beef"), email!("dead@dead"), email!("bar@bax"),], - body: "foobarmessage".into(), - }) - ) - } - - #[tokio::test] - async fn test_max_recipients() { - let stream = create_basic_stream() - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"455 4.5.3 Too many recipients.\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_max_message_size() { - let stream = create_basic_stream() - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"DATA\r\n") - .write(b"354 Start mail input; end with .\r\n") - .read(format!("{}\r\n.\r\n", "1".repeat(513)).as_bytes()) - .write(b"552 5.3.4 Message too big, we accept up to 512 bytes.\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_max_messages_per_session() { - let mut builder = create_basic_stream(); - stream_send_message(&mut builder); - stream_send_message(&mut builder); - stream_send_message(&mut builder); - - let stream = builder - .read(b"MAIL FROM:\r\n") - .write(b"250 2.1.0 OK\r\n") - .read(b"RCPT TO:\r\n") - .write(b"250 2.1.5 OK\r\n") - .read(b"DATA\r\n") - .write(b"452 4.4.5 Maximum number of messages per session exceeded.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::TooManyMessagesPerSession - )); - } - - #[tokio::test] - async fn test_max_errors() { - let stream = tokio_test::io::Builder::new() - .write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"500 5.5.1 Invalid command.\r\n") - .read(b"FOO\r\n") - .write(b"452 4.3.2 Too many errors.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::TooManyErrors - )); - } - - #[tokio::test] - async fn test_request_too_large() { - let stream = tokio_test::io::Builder::new() - .write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(format!("EHLO {}", "1".repeat(2048)).as_bytes()) - .read(format!("{}\r\n", "1".repeat(2048)).as_bytes()) - .write(b"554 5.3.4 Line is too long.\r\n") - .read(b"QUIT\r\n") - .write(b"221 2.0.0 Bye.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::Quit - )); - } - - #[tokio::test] - async fn test_max_session_transfer_quota() { - let stream = tokio_test::io::Builder::new() - .write(b"220 test ESMTP IC SMTP Gateway\r\n") - .read(format!("EHLO {}\r\n", "1".repeat(8192)).as_bytes()) - .write(b"452 4.7.28 Session transfer quota exceeded.\r\n") - .build(); - - let mut session = create_session(stream, None); - - assert!(matches!( - session.handle(CancellationToken::new()).await.unwrap_err(), - SessionError::TransferQuotaExceeded(_) - )); - } - - #[tokio::test] - async fn test_starttls() { - rustls::crypto::aws_lc_rs::default_provider() - .install_default() - .ok(); - - // Use an in-memory pipe - let (stream1, mut stream2) = duplex(8192); - - let rustls_server_cfg = ServerConfig::builder() - .with_no_client_auth() - .with_cert_resolver(Arc::new( - StubResolver::new(TEST_CERT_1.as_bytes(), TEST_KEY_1.as_bytes()).unwrap(), - )); - - let mut cfg = SessionConfig::new("test", 512); - cfg.tls_mode = SessionTlsMode::Required(Arc::new(rustls_server_cfg)); - - tokio::spawn(async move { - SessionManager::handle_connection( - stream1, - SocketAddr::from_str("1.1.1.1:123").unwrap(), - Arc::new(cfg), - CancellationToken::new(), - ) - .await; - }); - - let mut buf = vec![0; 8192]; - - let r = stream2.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"220 test ESMTP IC SMTP Gateway\r\n"); - - // Make sure EHLO advertises 250-STARTTLS - stream2.write_all(b"EHLO foo.bar\r\n").await.unwrap(); - let r = stream2.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"250-test you had me at EHLO\r\n250-STARTTLS\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); - - // Make sure TLS is required by the server due to SessionTlsMode::Required - stream2.write_all(b"MAIL FROM:\r\n").await.unwrap(); - let r = stream2.read(&mut buf).await.unwrap(); - assert_eq!( - &buf[..r], - b"503 5.5.1 TLS is required to submit mail on this server.\r\n" - ); - - // Fire up TLS handshake - stream2.write_all(b"STARTTLS\r\n").await.unwrap(); - let r = stream2.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"220 2.0.0 Ready to start TLS.\r\n"); - - let rustls_client_cfg = ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(Arc::new(NoopServerCertVerifier::default())) - .with_no_client_auth(); - let tls_connector = TlsConnector::from(Arc::new(rustls_client_cfg)); - let mut tls_stream = tls_connector - .connect(ServerName::try_from("foo").unwrap(), stream2) - .await - .unwrap(); - - // Make sure there's no 250-STARTTLS and 250-REQUIRETLS in EHLO anymore inside TLS session - tls_stream.write_all(b"EHLO foo.bar\r\n").await.unwrap(); - let r = tls_stream.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"250-test you had me at EHLO\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); - - // No TLS-in-TLS allowed - tls_stream.write_all(b"STARTTLS\r\n").await.unwrap(); - let r = tls_stream.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"504 5.7.4 Already in TLS mode.\r\n"); - - // Now MAIL FROM should work - tls_stream.write_all(b"MAIL FROM:\r\n").await.unwrap(); - let r = tls_stream.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"250 2.1.0 OK\r\n"); - - // All good - tls_stream.write_all(b"QUIT\r\n").await.unwrap(); - let r = tls_stream.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..r], b"221 2.0.0 Bye.\r\n"); - } - - #[tokio::test] - async fn test_with_smtp_client() { - rustls::crypto::aws_lc_rs::default_provider() - .install_default() - .ok(); - - let rustls_server_cfg = ServerConfig::builder() - .with_no_client_auth() - .with_cert_resolver(Arc::new( - StubResolver::new(TEST_CERT_1.as_bytes(), TEST_KEY_1.as_bytes()).unwrap(), - )); - - let agent = Arc::new(TestDeliveryAgent::default()); - - // Listen on the random free port - let listener = listen_tcp("127.0.0.1:0".parse().unwrap(), ListenerOpts::default()).unwrap(); - let port = listener.local_addr().unwrap().port(); - - let mut cfg = SessionConfig::new("test", 10 * 1024 * 1024); - cfg.delivery_agent = agent.clone(); - cfg.tls_mode = SessionTlsMode::Allowed(Arc::new(rustls_server_cfg)); - - let server = Server::new_with_listener(listener, cfg).unwrap(); - - tokio::spawn(async move { - server.serve(CancellationToken::new()).await.unwrap(); - }); - - let message = MessageBuilder::new() - .from(("John Doe", "john@doe.com")) - .to(("Jane Doe", "jane@doe.com")) - .subject("Hello") - .text_body("Blah"); - - let mut client = SmtpClientBuilder::new("127.0.0.1", port) - .unwrap() - .implicit_tls(false) - .helo_host("foo.bar") - .allow_invalid_certs() - .connect() - .await - .unwrap(); - - // Try some commands - client.noop().await.unwrap(); - client.rset().await.unwrap(); - - // Make sure we have the required caps - let caps = client.capabilities("foo.bar", false).await.unwrap(); - assert_eq!( - caps.capabilities, - EXT_SMTP_UTF8 - | EXT_8BIT_MIME - | EXT_CHUNKING - | EXT_ENHANCED_STATUS_CODES - | EXT_SIZE - | EXT_PIPELINING - ); - - client.send(message).await.unwrap(); - - // Make sure the agent gets the correct mail - let msg = agent.0.lock().unwrap().clone().unwrap(); - assert_eq!(msg.id, Uuid::nil()); - assert_eq!(msg.ehlo_hostname, "foo.bar"); - assert_eq!(msg.mail_from, "john@doe.com"); - assert_eq!(msg.rcpt_to, vec!["jane@doe.com"]); - - let parsed = MessageParser::new().parse(&msg.body).unwrap(); - assert_eq!(parsed.subject(), Some("Hello")); - assert_eq!( - *parsed.from().unwrap(), - Address::List(vec![Addr::new(Some("John Doe"), "john@doe.com")]) - ); - assert_eq!( - *parsed.to().unwrap(), - Address::List(vec![Addr::new(Some("Jane Doe"), "jane@doe.com")]) - ); - assert_eq!(parsed.body_text(0).unwrap(), "Blah"); + fn meta(&self) -> SessionMeta { + SessionMeta { + id: self.id, + message_id: self.data.message_id, + remote_ip: self.remote_ip, + tls_info: self.tls_info.clone(), + counters: self.counters.clone(), + last_error: self.data.last_error.clone(), + ehlo_hostname: self.data.ehlo_hostname.clone(), + mail_from: self.data.mail_from.clone(), + rcpt_to: self.data.rcpt_to.clone(), + } } } + +#[cfg(test)] +mod tests; diff --git a/ic-bn-lib/src/smtp/inbound/rcpt_to.rs b/ic-bn-lib/src/smtp/inbound/rcpt_to.rs index 72c49d8..736d17d 100644 --- a/ic-bn-lib/src/smtp/inbound/rcpt_to.rs +++ b/ic-bn-lib/src/smtp/inbound/rcpt_to.rs @@ -8,7 +8,7 @@ use tracing::{debug, info}; use crate::{ network::AsyncReadWrite, smtp::{ - RecipientPolicy, RecipientResolveError, + ProtocolError, RecipientPolicy, RecipientResolveError, address::EmailAddress, inbound::{MAX_REPLY_LEN, Session, SessionResult}, }, @@ -19,6 +19,9 @@ impl Session { /// Handles RCPT TO command pub async fn handle_rcpt_to(&mut self, to: RcptTo>) -> SessionResult<()> { let Some(mail_from) = &self.data.mail_from else { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "RCPT TO before MAIL FROM".into(), + )); return self .reply("503", "5.5.1", "MAIL FROM is required first.") .await; @@ -35,6 +38,10 @@ impl Session { let Ok(address) = EmailAddress::from_str(&to.address) else { info!("{self}: {}: incorrect address", to.address); + self.set_error(ProtocolError::RecipientValidationFailed(format!( + "Incorrect address: {}", + to.address + ))); return self.reply("550", "5.1.2", "Incorrect address.").await; }; @@ -44,6 +51,11 @@ impl Session { if self.data.rcpt_to.len() >= self.cfg.max_recipients { info!("{self}: {}: too many recipients", to.address); + self.set_error(ProtocolError::RecipientValidationFailed(format!( + "Too many recipients: {} > {}", + self.data.rcpt_to.len(), + self.cfg.max_recipients + ))); return self.reply("455", "4.5.3", "Too many recipients.").await; } @@ -72,6 +84,9 @@ impl Session { Err(e) => { info!("{self}: {}: recipient resolution error: {e:#}", to.address); + self.set_error(ProtocolError::RecipientValidationFailed(format!( + "Recipient resolution failed: {e:#}", + ))); return match e { RecipientResolveError::UnknownDomain => { diff --git a/ic-bn-lib/src/smtp/inbound/session.rs b/ic-bn-lib/src/smtp/inbound/session.rs index 113f901..7e79686 100644 --- a/ic-bn-lib/src/smtp/inbound/session.rs +++ b/ic-bn-lib/src/smtp/inbound/session.rs @@ -2,6 +2,7 @@ use std::{ borrow::Cow, fmt::{self, Write as _}, io::Write as _, + sync::Arc, time::{Duration, Instant}, }; @@ -23,7 +24,7 @@ use uuid::Uuid; use crate::{ network::AsyncReadWrite, smtp::{ - DeliveryError, EmailMessage, + DeliveryError, EmailMessage, MessageError, ProtocolError, inbound::{ MAX_REPLY_LEN, Session, SessionError, SessionResult, SessionState, SessionUpgrade, }, @@ -90,6 +91,11 @@ impl Session { pub(crate) async fn message_too_big(&mut self) -> SessionResult<()> { let max_size = self.cfg.max_message_size; + self.set_error(ProtocolError::MessageTooBig(format!( + "{} > {}", + self.data.message.len(), + max_size + ))); self.reply_with("552", "5.3.4", |buf| { write!(buf, "Message too big, we accept up to {max_size} bytes.",) }) @@ -125,7 +131,7 @@ impl Session { } async fn handle_error(&mut self, error: SmtpError) -> SessionResult<()> { - self.counters.errors += 1; + self.set_error(ProtocolError::SmtpError(error.to_string())); let (code, ext, msg) = match error { SmtpError::UnknownCommand | SmtpError::InvalidResponse { .. } => { @@ -196,9 +202,9 @@ impl Session { self.reply("250", "2.0.0", "OK").await?; } _ => { + self.set_error(ProtocolError::SmtpError("Command not implemented".into())); self.reply("502", "5.5.1", "Command not implemented.") .await?; - self.counters.errors += 1; } } @@ -217,7 +223,7 @@ impl Session { } // Check if we are over session time quota - if Instant::now() > self.counters.valid_until { + if Instant::now() > self.counters.started + self.cfg.max_session_duration { self.reply("452", "4.3.2", "Session open for too long.") .await?; return Err(SessionError::TtlExceeded( @@ -284,11 +290,15 @@ impl Session { Request::StartTls => { debug!("{self}: <- STARTTLS"); if self.tls_info.is_some() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "STARTTLS inside STARTTLS".into(), + )); self.reply("504", "5.7.4", "Already in TLS mode.").await?; - self.counters.errors += 1; } else if !self.cfg.tls_mode.enabled() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "STARTTLS without TLS enabled".into(), + )); self.reply("502", "5.7.0", "TLS not available.").await?; - self.counters.errors += 1; } else { self.reply("220", "2.0.0", "Ready to start TLS.").await?; self.state = state; @@ -347,9 +357,9 @@ impl Session { SessionState::RequestTooLarge(rx) => { // If line-feed found - issue error, otherwise keep ingesting if rx.ingest(&mut iter) { + self.set_error(ProtocolError::SmtpError("Line is too long".into())); self.reply("554", "5.3.4", "Line is too long.").await?; state = SessionState::default(); - self.counters.errors += 1; } else { // No line-feed found yet break; @@ -360,7 +370,6 @@ impl Session { if rx.ingest(&mut iter) { self.message_too_big().await?; state = SessionState::default(); - self.counters.errors += 1; } else { // No end-of-message marker found yet break; @@ -422,26 +431,26 @@ impl Session { async fn verify_message( &mut self, #[allow(unused_variables)] msg: &EmailMessage, - ) -> SessionResult { + ) -> SessionResult> { #[cfg(test)] { - return Ok(true); + return Ok(None); } let Some(auth_message) = AuthenticatedMessage::parse(&msg.body) else { info!( - "{self}: {}: {} -> {:?}: message parsing failed", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{self}: {} -> {:?}: message parsing failed", + msg.mail_from, msg.rcpt_to ); self.reply("550", "5.7.7", "Failed to parse the message") .await?; - return Ok(false); + return Ok(Some(MessageError::ParsingFailed)); }; if auth_message.received_headers_count() > self.cfg.max_received_headers { info!( - "{self}: {}: {} -> {:?}: message verification failed: too many 'Received' headers", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{self}: {} -> {:?}: message verification failed: too many 'Received' headers", + msg.mail_from, msg.rcpt_to ); self.reply( "450", @@ -449,7 +458,7 @@ impl Session { "Too many 'Received' headers. Possible loop detected.", ) .await?; - return Ok(false); + return Ok(Some(MessageError::TooManyReceivedHeaders)); } if self.cfg.verify_dkim { @@ -459,23 +468,23 @@ impl Session { let log_name = self.to_string(); // Replies with either a temporary or a permanent error code - let mut reply = async || -> SessionResult<()> { + let mut reply = async |error: &str| -> SessionResult<()> { if outputs .iter() .any(|x| matches!(x.result(), DkimResult::TempError(_))) { // If any of the signatures that failed with a temporary error - return temporary SMTP error code info!( - "{log_name}: {}: {} -> {:?}: DKIM verification temporary failure", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{log_name}: {} -> {:?}: DKIM verification temporary failure: {error}", + msg.mail_from, msg.rcpt_to ); self.reply("451", "4.7.20", "DKIM validation temporary failure.") .await } else { // Otherwise permanent info!( - "{log_name}: {}: {} -> {:?}: DKIM verification failure", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{log_name}: {} -> {:?}: DKIM verification failure: {error}", + msg.mail_from, msg.rcpt_to ); self.reply("550", "5.7.20", "DKIM validation failed.").await } @@ -486,71 +495,83 @@ impl Session { .filter(|x| matches!(x.result(), DkimResult::Pass | DkimResult::None)) .count(); - if strict { - // Check that *all* signatures have passed validation (or there are none) - if outputs.len() != signatures_passed { - reply().await?; - return Ok(false); - } + // Get first validation error (if any) + let first_error = outputs + .iter() + .filter_map(|x| match x.result() { + DkimResult::Fail(e) | DkimResult::PermError(e) | DkimResult::TempError(e) => { + Some(e.to_string()) + } + _ => None, + }) + .next(); + + // Strict: Check that *all* signatures have passed validation (or there are none) + // else: Check if *any* of the signatures have passed validation (or there are none) + let is_valid = if strict { + signatures_passed == outputs.len() } else { - // Check if *any* of the signatures have passed validation (or there are none) - if !outputs.is_empty() && signatures_passed == 0 { - reply().await?; - return Ok(false); - } + signatures_passed != 0 || outputs.is_empty() + }; + + if !is_valid { + let error = first_error.unwrap_or_default(); + reply(&error).await?; + return Ok(Some(MessageError::DkimValidationFailed(error))); } if signatures_passed > 0 { debug!( - "{log_name}: {}: {} -> {:?}: DKIM validation succeeded ({signatures_passed}/{} signatures passed)", - msg.ehlo_hostname, + "{log_name}: {} -> {:?}: DKIM validation succeeded ({signatures_passed}/{} signatures passed)", msg.mail_from, msg.rcpt_to, outputs.len() ); } else { debug!( - "{log_name}: {}: {} -> {:?}: DKIM validation succeeded ({signatures_passed}/{} signatures passed)", - msg.ehlo_hostname, - msg.mail_from, - msg.rcpt_to, - outputs.len() + "{log_name}: {} -> {:?}: No DKIM signatures found", + msg.mail_from, msg.rcpt_to, ); } } - Ok(true) + Ok(None) } async fn queue_message(&mut self) -> SessionResult<()> { - #[cfg(not(test))] - let id = Uuid::now_v7(); - #[cfg(test)] - let id = Uuid::nil(); - let message_size = self.data.message.len(); + let id = self.data.message_id; // SAFETY: Code makes sure these are all Some(). // It's better to panic in tests if they are not. - let msg = EmailMessage { + let msg = Arc::new(EmailMessage { id, - ehlo_hostname: self.data.ehlo_hostname.clone().unwrap(), mail_from: self.data.mail_from.take().unwrap(), rcpt_to: std::mem::take(&mut self.data.rcpt_to), body: std::mem::take(&mut self.data.message).into(), - }; + }); // Run configured verification steps on the message body - if !self.verify_message(&msg).await? { + if let Some(e) = self.verify_message(&msg).await? { + self.notify_message(msg.clone(), Some(e)); self.reset_message(); return Ok(()); } - if let Err(e) = self.cfg.delivery_agent.deliver_mail(msg.clone()).await { + // Deliver the message. + // Message cloning is rather lightweight (body is Bytes) + if let Err(e) = self + .cfg + .delivery_agent + .deliver_mail(self.meta(), msg.clone()) + .await + { info!( - "{self}: {}: {} -> {:?}: message delivery failed: {e:#}", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{self}: {} -> {:?}: message delivery failed: {e:#}", + msg.mail_from, msg.rcpt_to ); + + self.notify_message(msg.clone(), Some(MessageError::DeliveryFailed(e.clone()))); self.reset_message(); return match e { @@ -569,9 +590,11 @@ impl Session { }; } + self.notify_message(msg.clone(), None); + info!( - "{self}: {}: {} -> {:?}: message ({message_size} bytes) queued with id {id}", - msg.ehlo_hostname, msg.mail_from, msg.rcpt_to + "{self}: {} -> {:?}: message ({message_size} bytes) queued with id {id}", + msg.mail_from, msg.rcpt_to ); self.reply_with("250", "2.0.0", |buf| { write!(buf, "Message ({message_size} bytes) queued with id {id}") @@ -593,9 +616,11 @@ impl Session { .await?; return Err(SessionError::TooManyMessagesPerSession); } else if self.data.rcpt_to.is_empty() { + self.set_error(ProtocolError::InvalidSequenceOfCommands( + "DATA before RCPT TO".into(), + )); self.reply("503", "5.5.1", "RCPT TO is required first.") .await?; - self.counters.errors += 1; return Ok(false); } @@ -604,6 +629,15 @@ impl Session { /// Resets the message-related fields to their initial state pub(crate) fn reset_message(&mut self) { + #[cfg(not(test))] + { + self.data.message_id = Uuid::now_v7(); + } + #[cfg(test)] + { + self.data.message_id = Uuid::nil(); + } + self.data.mail_from = None; self.data.rcpt_to.clear(); self.data.message.clear(); diff --git a/ic-bn-lib/src/smtp/inbound/tests.rs b/ic-bn-lib/src/smtp/inbound/tests.rs new file mode 100644 index 0000000..f2b89d2 --- /dev/null +++ b/ic-bn-lib/src/smtp/inbound/tests.rs @@ -0,0 +1,711 @@ +use std::{net::SocketAddr, str::FromStr, sync::Mutex}; + +use async_trait::async_trait; +use ic_bn_lib_common::types::http::ListenerOpts; +use mail_parser::{Addr, Address, MessageParser}; +use mail_send::{SmtpClientBuilder, mail_builder::MessageBuilder}; +use rustls::{ClientConfig, ProtocolVersion, pki_types::ServerName}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; +use tokio_rustls::TlsConnector; +use tokio_util::sync::CancellationToken; + +use crate::{ + email, + network::listener::listen_tcp, + smtp::{ + DeliveryError, EmailMessage, MessageError, RecipientPolicy, RecipientResolveError, + inbound::manager::SessionManager, server::Server, + }, + tests::{TEST_CERT_1, TEST_KEY_1}, + tls::{resolver::StubResolver, verify::NoopServerCertVerifier}, +}; + +use super::*; + +#[derive(Debug, Default)] +pub struct TestDeliveryAgent(Mutex>>, Option); + +#[async_trait] +impl DeliversMail for TestDeliveryAgent { + async fn deliver_mail( + &self, + _meta: SessionMeta, + message: Arc, + ) -> Result<(), DeliveryError> { + if let Some(e) = &self.1 { + return Err(e.clone()); + } + + *self.0.lock().unwrap() = Some(message); + Ok(()) + } +} + +#[allow(clippy::type_complexity)] +#[derive(Debug, Default)] +pub struct TestNotificationsReceiver { + msg: Mutex, Option)>>, + sess: Mutex)>>, + proto_error: Mutex>, +} + +#[async_trait] +impl ReceivesNotifications for TestNotificationsReceiver { + async fn notify_message( + &self, + meta: SessionMeta, + message: Arc, + error: Option, + ) { + *self.msg.lock().unwrap() = Some((meta, message, error)); + } + + async fn notify_protocol_error(&self, meta: SessionMeta, error: ProtocolError) { + *self.proto_error.lock().unwrap() = Some((meta, error)); + } + + async fn notify_session_finish(&self, meta: SessionMeta, error: Option) { + *self.sess.lock().unwrap() = Some((meta, error)); + } +} + +#[derive(Debug)] +pub struct TestRecipientResolver( + EmailAddress, + Option, + Option>, +); + +#[async_trait] +impl ResolvesRecipient for TestRecipientResolver { + async fn resolve_recipient( + &self, + _from: &EmailAddress, + rcpt: &EmailAddress, + ) -> Result { + assert_eq!(rcpt, &self.0); + if let Some(v) = &self.1 { + return Ok(RecipientPolicy::Rewrite(v.clone())); + } + + if let Some(v) = &self.2 { + return Ok(RecipientPolicy::Expand(v.clone())); + } + + Ok(RecipientPolicy::Accept) + } +} + +fn create_session(stream: S, greeting_delay: Option) -> Session { + let mut cfg = SessionConfig::new("test", 512); + cfg.max_errors = 5; + cfg.greeting_delay = greeting_delay; + cfg.max_messages_per_session = 3; + cfg.max_session_data = 8192; + cfg.max_recipients = 3; + + Session::new(IpAddr::from_str("1.1.1.1").unwrap(), stream, Arc::new(cfg)) +} + +fn create_basic_stream() -> tokio_test::io::Builder { + let mut builder = tokio_test::io::Builder::new(); + + builder.write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(b"HELO foo.bar\r\n") + .write(b"250 test you had me at HELO\r\n") + .read(b"EHLO foo.bar\r\n") + .write(b"250-test you had me at EHLO\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); + + builder +} + +fn stream_send_message(b: &mut tokio_test::io::Builder) { + b.read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"DATA\r\n") + .write(b"354 Start mail input; end with .\r\n") + .read(b"foobarmessage\r\n.\r\n") + .write( + b"250 2.0.0 Message (13 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n", + ); +} + +#[tokio::test] +async fn test_ehlo_required() { + let stream = tokio_test::io::Builder::new() + .write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(b"MAIL FROM:\r\n") + .write(b"503 5.5.1 Polite people say EHLO first.\r\n") + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_pipelining() { + let mut builder = create_basic_stream(); + let stream = builder + .read(b"MAIL FROM:\r\nRCPT TO:\r\nDATA\r\n") + .write(b"250 2.1.0 OK\r\n250 2.1.5 OK\r\n354 Start mail input; end with .\r\n") + .read(b"foobarmessage\r\n.\r\n") + .write( + b"250 2.0.0 Message (13 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n", + ) + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_basic_session() { + let mut builder = create_basic_stream(); + builder + .read(b"RCPT TO:\r\n") + .write(b"503 5.5.1 MAIL FROM is required first.\r\n") + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"MAIL FROM:\r\n") + .write(b"503 5.5.1 Multiple MAIL FROM commands are not allowed.\r\n") + .read(b"DATA\r\n") + .write(b"503 5.5.1 RCPT TO is required first.\r\n") + .read(b"RSET\r\n") + .write(b"250 2.0.0 OK\r\n") + .read(b"NOOP\r\n") + .write(b"250 2.0.0 OK\r\n") + .read(b"FOOB\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"HELP\r\n") + .write(b"502 5.5.1 Command not implemented.\r\n"); + + stream_send_message(&mut builder); + let stream = builder + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_client_sends_before_greeting() { + let stream = tokio_test::io::Builder::new() + .read(b"EHLO foo.bar\r\n") + .write(b"501 5.7.1 Client sent command before greeting banner.\r\n") + .build(); + + let mut session = create_session(stream, Some(Duration::from_millis(100))); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::SendsBeforeGreeting + )); +} + +#[tokio::test] +async fn test_bdat() { + let stream = create_basic_stream() + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"BDAT 10\r\n") + .read(b"01234") + .read(b"56789") + .write(b"250 2.6.0 Chunk accepted.\r\n") + .read(b"BDAT 10\r\n") + .read(b"987") + .read(b"654") + .read(b"3210") + .write(b"250 2.6.0 Chunk accepted.\r\n") + .read(b"BDAT 10 LAST\r\n") + .read(b"0123456789") + .write( + b"250 2.0.0 Message (30 bytes) queued with id 00000000-0000-0000-0000-000000000000\r\n", + ) + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let agent = Arc::new(TestDeliveryAgent::default()); + let resolver = TestRecipientResolver( + "dead@beef".try_into().unwrap(), + Some("bar@baz".try_into().unwrap()), + None, + ); + + let mut cfg = SessionConfig::new("test", 512); + cfg.delivery_agent = agent.clone(); + cfg.recipient_resolver = Arc::new(resolver); + + let remote_ip = IpAddr::from_str("1.1.1.1").unwrap(); + let mut session = Session::new(remote_ip, stream, Arc::new(cfg)); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); + + // Make sure the agent gets the correct mail + assert_eq!( + agent.0.lock().unwrap().clone().unwrap().as_ref(), + &EmailMessage { + id: Uuid::nil(), + mail_from: "foo@bar".try_into().unwrap(), + rcpt_to: vec!["bar@baz".try_into().unwrap()], + body: "012345678998765432100123456789".into(), + } + ); +} + +#[tokio::test] +async fn test_data() { + let mut builder = create_basic_stream(); + stream_send_message(&mut builder); + + let stream = builder + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let agent = Arc::new(TestDeliveryAgent::default()); + let resolver = TestRecipientResolver( + "dead@beef".try_into().unwrap(), + Some("bar@baz".try_into().unwrap()), + None, + ); + + let mut cfg = SessionConfig::new("test", 512); + cfg.delivery_agent = agent.clone(); + cfg.recipient_resolver = Arc::new(resolver); + + let remote_ip = IpAddr::from_str("1.1.1.1").unwrap(); + let mut session = Session::new(remote_ip, stream, Arc::new(cfg)); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); + + // Make sure the agent gets the correct mail + assert_eq!( + agent.0.lock().unwrap().clone().unwrap().as_ref(), + &EmailMessage { + id: Uuid::nil(), + mail_from: email!("foo@bar"), + rcpt_to: vec![email!("bar@baz")], + body: "foobarmessage".into(), + } + ) +} + +#[tokio::test] +async fn test_expand() { + let mut builder = create_basic_stream(); + stream_send_message(&mut builder); + + let stream = builder + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let agent = Arc::new(TestDeliveryAgent::default()); + let resolver = TestRecipientResolver( + "dead@beef".try_into().unwrap(), + None, + Some(vec![ + "dead@dead".try_into().unwrap(), + "bar@bax".try_into().unwrap(), + ]), + ); + + let mut cfg = SessionConfig::new("test", 512); + cfg.delivery_agent = agent.clone(); + cfg.recipient_resolver = Arc::new(resolver); + + let remote_ip = IpAddr::from_str("1.1.1.1").unwrap(); + let mut session = Session::new(remote_ip, stream, Arc::new(cfg)); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); + + // Make sure the agent gets the correct mail + assert_eq!( + agent.0.lock().unwrap().clone().unwrap().as_ref(), + &EmailMessage { + id: Uuid::nil(), + mail_from: email!("foo@bar"), + rcpt_to: vec![email!("dead@beef"), email!("dead@dead"), email!("bar@bax"),], + body: "foobarmessage".into(), + } + ) +} + +#[tokio::test] +async fn test_max_recipients() { + let stream = create_basic_stream() + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"455 4.5.3 Too many recipients.\r\n") + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_max_message_size() { + let stream = create_basic_stream() + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"DATA\r\n") + .write(b"354 Start mail input; end with .\r\n") + .read(format!("{}\r\n.\r\n", "1".repeat(513)).as_bytes()) + .write(b"552 5.3.4 Message too big, we accept up to 512 bytes.\r\n") + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_max_messages_per_session() { + let mut builder = create_basic_stream(); + stream_send_message(&mut builder); + stream_send_message(&mut builder); + stream_send_message(&mut builder); + + let stream = builder + .read(b"MAIL FROM:\r\n") + .write(b"250 2.1.0 OK\r\n") + .read(b"RCPT TO:\r\n") + .write(b"250 2.1.5 OK\r\n") + .read(b"DATA\r\n") + .write(b"452 4.4.5 Maximum number of messages per session exceeded.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::TooManyMessagesPerSession + )); +} + +#[tokio::test] +async fn test_max_errors() { + let stream = tokio_test::io::Builder::new() + .write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"500 5.5.1 Invalid command.\r\n") + .read(b"FOO\r\n") + .write(b"452 4.3.2 Too many errors.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::TooManyErrors + )); +} + +#[tokio::test] +async fn test_request_too_large() { + let stream = tokio_test::io::Builder::new() + .write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(format!("EHLO {}", "1".repeat(2048)).as_bytes()) + .read(format!("{}\r\n", "1".repeat(2048)).as_bytes()) + .write(b"554 5.3.4 Line is too long.\r\n") + .read(b"QUIT\r\n") + .write(b"221 2.0.0 Bye.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::Quit + )); +} + +#[tokio::test] +async fn test_max_session_transfer_quota() { + let stream = tokio_test::io::Builder::new() + .write(b"220 test ESMTP IC SMTP Gateway\r\n") + .read(format!("EHLO {}\r\n", "1".repeat(8192)).as_bytes()) + .write(b"452 4.7.28 Session transfer quota exceeded.\r\n") + .build(); + + let mut session = create_session(stream, None); + + assert!(matches!( + session.handle(CancellationToken::new()).await.unwrap_err(), + SessionError::TransferQuotaExceeded(_) + )); +} + +#[tokio::test] +async fn test_starttls() { + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .ok(); + + // Use an in-memory pipe + let (stream1, mut stream2) = duplex(8192); + + let rustls_server_cfg = ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new( + StubResolver::new(TEST_CERT_1.as_bytes(), TEST_KEY_1.as_bytes()).unwrap(), + )); + + let mut cfg = SessionConfig::new("test", 512); + cfg.tls_mode = SessionTlsMode::Required(Arc::new(rustls_server_cfg)); + + tokio::spawn(async move { + SessionManager::handle_connection( + stream1, + SocketAddr::from_str("1.1.1.1:123").unwrap(), + Arc::new(cfg), + CancellationToken::new(), + ) + .await; + }); + + let mut buf = vec![0; 8192]; + + let r = stream2.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"220 test ESMTP IC SMTP Gateway\r\n"); + + // Make sure EHLO advertises 250-STARTTLS + stream2.write_all(b"EHLO foo.bar\r\n").await.unwrap(); + let r = stream2.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"250-test you had me at EHLO\r\n250-STARTTLS\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); + + // Make sure TLS is required by the server due to SessionTlsMode::Required + stream2.write_all(b"MAIL FROM:\r\n").await.unwrap(); + let r = stream2.read(&mut buf).await.unwrap(); + assert_eq!( + &buf[..r], + b"503 5.5.1 TLS is required to submit mail on this server.\r\n" + ); + + // Fire up TLS handshake + stream2.write_all(b"STARTTLS\r\n").await.unwrap(); + let r = stream2.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"220 2.0.0 Ready to start TLS.\r\n"); + + let rustls_client_cfg = ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoopServerCertVerifier::default())) + .with_no_client_auth(); + let tls_connector = TlsConnector::from(Arc::new(rustls_client_cfg)); + let mut tls_stream = tls_connector + .connect(ServerName::try_from("foo").unwrap(), stream2) + .await + .unwrap(); + + // Make sure there's no 250-STARTTLS and 250-REQUIRETLS in EHLO anymore inside TLS session + tls_stream.write_all(b"EHLO foo.bar\r\n").await.unwrap(); + let r = tls_stream.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"250-test you had me at EHLO\r\n250-SMTPUTF8\r\n250-SIZE 512\r\n250-PIPELINING\r\n250-ENHANCEDSTATUSCODES\r\n250-CHUNKING\r\n250 8BITMIME\r\n"); + + // No TLS-in-TLS allowed + tls_stream.write_all(b"STARTTLS\r\n").await.unwrap(); + let r = tls_stream.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"504 5.7.4 Already in TLS mode.\r\n"); + + // Now MAIL FROM should work + tls_stream.write_all(b"MAIL FROM:\r\n").await.unwrap(); + let r = tls_stream.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"250 2.1.0 OK\r\n"); + + // All good + tls_stream.write_all(b"QUIT\r\n").await.unwrap(); + let r = tls_stream.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..r], b"221 2.0.0 Bye.\r\n"); +} + +#[tokio::test] +async fn test_with_smtp_client() { + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .ok(); + + let rustls_server_cfg = ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new( + StubResolver::new(TEST_CERT_1.as_bytes(), TEST_KEY_1.as_bytes()).unwrap(), + )); + + let agent = Arc::new(TestDeliveryAgent::default()); + let notification_handler = Arc::new(TestNotificationsReceiver::default()); + + // Listen on the random free port + let listener = listen_tcp("127.0.0.1:0".parse().unwrap(), ListenerOpts::default()).unwrap(); + let port = listener.local_addr().unwrap().port(); + + let mut cfg = SessionConfig::new("test", 10 * 1024 * 1024); + cfg.delivery_agent = agent.clone(); + cfg.tls_mode = SessionTlsMode::Allowed(Arc::new(rustls_server_cfg)); + cfg.notifications_handler = Some(notification_handler.clone()); + + let token = CancellationToken::new(); + let token_child = token.child_token(); + let server = Server::new_with_listener(listener, cfg).unwrap(); + let server_handle = tokio::spawn(async move { + server.serve(token_child).await.unwrap(); + }); + + let message = MessageBuilder::new() + .from(("John Doe", "john@doe.com")) + .to(("Jane Doe", "jane@doe.com")) + .subject("Hello") + .text_body("Blah"); + + let mut client = SmtpClientBuilder::new("127.0.0.1", port) + .unwrap() + .implicit_tls(false) + .helo_host("foo.bar") + .allow_invalid_certs() + .connect() + .await + .unwrap(); + + // Try some commands + client.noop().await.unwrap(); + client.rset().await.unwrap(); + + // Make sure we have the required caps + let caps = client.capabilities("foo.bar", false).await.unwrap(); + assert_eq!( + caps.capabilities, + EXT_SMTP_UTF8 + | EXT_8BIT_MIME + | EXT_CHUNKING + | EXT_ENHANCED_STATUS_CODES + | EXT_SIZE + | EXT_PIPELINING + ); + + client.send(message).await.unwrap(); + // Send some bad command to emit a protocol error notification + client.cmd(b"FOOBAR\r\n").await.ok(); + client.quit().await.unwrap(); + + // Make sure the agent gets the correct mail + let msg = agent.0.lock().unwrap().clone().unwrap(); + assert_eq!(msg.id, Uuid::nil()); + assert_eq!(msg.mail_from, "john@doe.com"); + assert_eq!(msg.rcpt_to, vec!["jane@doe.com"]); + + let parsed = MessageParser::new().parse(&msg.body).unwrap(); + assert_eq!(parsed.subject(), Some("Hello")); + assert_eq!( + *parsed.from().unwrap(), + Address::List(vec![Addr::new(Some("John Doe"), "john@doe.com")]) + ); + assert_eq!( + *parsed.to().unwrap(), + Address::List(vec![Addr::new(Some("Jane Doe"), "jane@doe.com")]) + ); + assert_eq!(parsed.body_text(0).unwrap(), "Blah"); + + // Shutdown the server + token.cancel(); + server_handle.await.unwrap(); + + // Check notifications + let (meta, msg, error) = notification_handler.msg.lock().unwrap().clone().unwrap(); + assert_eq!(meta.id, Uuid::nil()); + assert_eq!(meta.message_id, Uuid::nil()); + assert_eq!(msg.mail_from, "john@doe.com"); + assert_eq!(msg.rcpt_to, vec!["jane@doe.com"]); + assert!(error.is_none()); + + let (meta, error) = notification_handler.sess.lock().unwrap().take().unwrap(); + assert_eq!(meta.id, Uuid::nil()); + assert_eq!(meta.remote_ip, IpAddr::from_str("127.0.0.1").unwrap()); + assert!(matches!(meta.last_error, Some(ProtocolError::SmtpError(_)))); + assert_eq!(meta.tls_info.unwrap().protocol, ProtocolVersion::TLSv1_3); + assert!(matches!(error, Some(SessionError::Quit))); + + // Error from the FOOBAR command + let (meta, error) = notification_handler + .proto_error + .lock() + .unwrap() + .take() + .unwrap(); + assert_eq!(meta.id, Uuid::nil()); + assert_eq!(meta.remote_ip, IpAddr::from_str("127.0.0.1").unwrap()); + assert!(matches!(error, ProtocolError::SmtpError(_))); +} + +#[test] +fn test_session_error() { + let s: &'static str = SessionError::Quit.into(); + assert_eq!(s, "quit"); + + let s: &'static str = SessionError::SendsBeforeGreeting.into(); + assert_eq!(s, "sends_before_greeting"); + + let s: &'static str = SessionError::TooManyMessagesPerSession.into(); + assert_eq!(s, "too_many_messages_per_session"); +} diff --git a/ic-bn-lib/src/smtp/mod.rs b/ic-bn-lib/src/smtp/mod.rs index 1a1646d..ca5ef19 100644 --- a/ic-bn-lib/src/smtp/mod.rs +++ b/ic-bn-lib/src/smtp/mod.rs @@ -1,14 +1,19 @@ -use std::fmt::{Debug, Display}; +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; use async_trait::async_trait; use bytes::Bytes; -use fqdn::FQDN; use itertools::Itertools; -use strum::Display; +use strum::{Display, IntoStaticStr}; use tracing::warn; use uuid::Uuid; -use crate::smtp::address::EmailAddress; +use crate::smtp::{ + address::EmailAddress, + inbound::{SessionError, SessionMeta}, +}; pub mod address; pub mod cli; @@ -49,11 +54,46 @@ pub enum DeliveryError { Permanent(String), } +/// Error that might happen during message validation or delivery +#[derive(thiserror::Error, Clone, Debug, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum MessageError { + #[error("Delivery failed: {0}")] + DeliveryFailed(#[from] DeliveryError), + #[error("Parsing failed")] + ParsingFailed, + #[error("Too many 'Received' headers")] + TooManyReceivedHeaders, + #[error("DKIM validation failed: {0}")] + DkimValidationFailed(String), +} + +/// Error that might happen during SMTP exchange +#[derive(thiserror::Error, Clone, Debug, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum ProtocolError { + #[error("Invalid EHLO hostname: {0}")] + InvalidEhloHostname(String), + #[error("Invalid command sequence: {0}")] + InvalidSequenceOfCommands(String), + #[error("Sender validation failed: {0}")] + SenderValidationFailed(String), + #[error("Recipient validation failed: {0}")] + RecipientValidationFailed(String), + #[error("Reverse IP validation failed: {0}")] + ReverseIpValidationFailed(String), + #[error("SPF validation failed: {0}")] + SpfValidationFailed(String), + #[error("Message too big: {0}")] + MessageTooBig(String), + #[error("SMTP protocol error: {0}")] + SmtpError(String), +} + /// Low-level E-Mail representation #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct EmailMessage { pub id: Uuid, - pub ehlo_hostname: FQDN, pub mail_from: EmailAddress, pub rcpt_to: Vec, pub body: Bytes, @@ -63,9 +103,8 @@ impl Display for EmailMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "id: {}, ehlo: {}, from: {}, to: {}, msg: {}", + "id: {}, from: {}, to: {}, msg: {}", self.id, - self.ehlo_hostname, self.mail_from, self.rcpt_to.iter().map(|x| x.to_string()).join(", "), String::from_utf8_lossy(&self.body) @@ -85,10 +124,30 @@ pub trait ResolvesRecipient: Send + Sync + Debug { ) -> Result; } +/// Notifies about events +#[async_trait] +pub trait ReceivesNotifications: Send + Sync + Debug { + /// Notify when the message is queued or the validation failed + async fn notify_message( + &self, + meta: SessionMeta, + message: Arc, + error: Option, + ); + /// Notify when the protocol error happens + async fn notify_protocol_error(&self, meta: SessionMeta, error: ProtocolError); + /// Notify when the session is finished + async fn notify_session_finish(&self, meta: SessionMeta, error: Option); +} + /// Delivers the E-Mail message #[async_trait] pub trait DeliversMail: Send + Sync + Debug { - async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError>; + async fn deliver_mail( + &self, + meta: SessionMeta, + message: Arc, + ) -> Result<(), DeliveryError>; } #[derive(Debug)] @@ -111,7 +170,11 @@ pub struct DummyDeliveryAgent; #[async_trait] impl DeliversMail for DummyDeliveryAgent { - async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError> { + async fn deliver_mail( + &self, + _meta: SessionMeta, + message: Arc, + ) -> Result<(), DeliveryError> { warn!("DummyDeliveryAgent: {message}"); Ok(()) }