Skip to content
Merged
2 changes: 2 additions & 0 deletions ic-bn-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub use ic_bn_lib_common;
#[cfg(feature = "smtp")]
pub use mail_auth;
pub use prometheus;
#[cfg(feature = "acme")]
pub use rcgen;
pub use reqwest;
pub use rustls;
#[cfg(feature = "acme-alpn")]
Expand Down
50 changes: 37 additions & 13 deletions ic-bn-lib/src/smtp/ic/delivery_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
candid::{Envelope, SmtpRequest, SmtpResponse},
parse_email,
},
inbound::SessionMeta,
},
};

Expand Down Expand Up @@ -242,10 +243,14 @@ impl IcSmtpDeliveryAgent {

#[async_trait]
impl DeliversMail for IcSmtpDeliveryAgent {
async fn deliver_mail(&self, message: EmailMessage) -> Result<(), DeliveryError> {
async fn deliver_mail(
&self,
meta: SessionMeta,
message: Arc<EmailMessage>,
) -> Result<(), DeliveryError> {
info!(
"{self}: delivering mail, ehlo: '{}', from: '{}', to: '{:?}', id '{}'",
message.ehlo_hostname, message.mail_from, message.rcpt_to, message.id
"{self}: delivering mail, ehlo: {:?}, from: '{}', to: '{:?}', id '{}'",
meta.ehlo_hostname, message.mail_from, message.rcpt_to, message.id
);

// A single message can be (potentially) destined for several canisters/domains.
Expand All @@ -256,17 +261,17 @@ impl DeliversMail for IcSmtpDeliveryAgent {
// The future in this loop usually resolves instantly due to the nature of the SMTP protocol.
// Before the mail is delivered it goes through an RCPT TO sequence which populates the cache.
// So making it concurrent isn't worth it probably currently.
for rcpt in message.rcpt_to {
for rcpt in &message.rcpt_to {
// Figure out which canister we should talk to
let canister_id = self
.resolve_canister_id(&rcpt)
.resolve_canister_id(rcpt)
.await
.ok_or_else(|| DeliveryError::Permanent("Unknown domain".into()))?;

if let Some(v) = mapping.get_mut(&canister_id) {
v.push(rcpt);
v.push(rcpt.clone());
} else {
mapping.insert(canister_id, vec![rcpt]);
mapping.insert(canister_id, vec![rcpt.clone()]);
}
}

Expand Down Expand Up @@ -359,14 +364,20 @@ impl ResolvesRecipient for IcSmtpDeliveryAgent {

#[cfg(test)]
mod tests {
use std::sync::{
Mutex,
atomic::{AtomicUsize, Ordering},
use std::{
net::IpAddr,
sync::{
Mutex,
atomic::{AtomicUsize, Ordering},
},
};

use crate::{
email,
smtp::ic::candid::{Header, Message, SmtpOk, SmtpRequestError},
smtp::{
ic::candid::{Header, Message, SmtpOk, SmtpRequestError},
inbound::SessionCounters,
},
};

use super::*;
Expand Down Expand Up @@ -607,7 +618,6 @@ mod tests {

let message = EmailMessage {
id: Uuid::nil(),
ehlo_hostname: fqdn!("foo.bar"),
mail_from: email!("john@doe.com"),
rcpt_to: vec![
// these two go to qoctq-giaaa-aaaaa-aaaea-cai as a single mail
Expand All @@ -619,7 +629,21 @@ mod tests {
body: message.as_bytes().into(),
};

delivery_agent.deliver_mail(message.clone()).await.unwrap();
let meta = SessionMeta {
id: Uuid::nil(),
message_id: Uuid::nil(),
remote_ip: IpAddr::from_str("1.1.1.1").unwrap(),
tls_info: None,
ehlo_hostname: None,
counters: SessionCounters::new(),
last_error: None,
mail_from: None,
rcpt_to: vec![],
};
delivery_agent
.deliver_mail(meta, Arc::new(message.clone()))
.await
.unwrap();

let body = indoc! {r#"
--XXXXboundary text
Expand Down
18 changes: 17 additions & 1 deletion ic-bn-lib/src/smtp/inbound/ehlo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use tracing::{debug, info};
use crate::{
http::dns::is_error_negative_lookup,
network::AsyncReadWrite,
smtp::inbound::{Session, SessionResult},
smtp::{
ProtocolError,
inbound::{Session, SessionResult},
},
};

impl<S: AsyncReadWrite> Session<S> {
Expand All @@ -15,6 +18,9 @@ impl<S: AsyncReadWrite> Session<S> {
// Validate hostname
let Ok(ehlo_hostname) = FQDN::from_str(host) else {
info!("{self}: {host}: Invalid EHLO hostname");
self.set_error(ProtocolError::InvalidEhloHostname(format!(
"{host}: incorrect hostname"
)));
return self.reply("550", "5.5.0", "Invalid EHLO hostname.").await;
};

Expand All @@ -28,6 +34,9 @@ impl<S: AsyncReadWrite> Session<S> {

if ehlo_hostname.depth() < 2 {
info!("{self}: {host}: EHLO is not FQDN");
self.set_error(ProtocolError::InvalidEhloHostname(format!(
"{host}: not FQDN"
)));
return self
.reply("550", "5.5.0", "EHLO hostname must be an FQDN.")
.await;
Expand All @@ -42,6 +51,9 @@ impl<S: AsyncReadWrite> Session<S> {
}
None => {
info!("{self}: {host}: EHLO not found in DNS");
self.set_error(ProtocolError::InvalidEhloHostname(format!(
"{host}: not found in DNS"
)));
return self
.reply("550", "5.5.0", "EHLO hostname not found in DNS.")
.await;
Expand All @@ -52,6 +64,10 @@ impl<S: AsyncReadWrite> Session<S> {
info!("{self}: {host}: EHLO not found in DNS: {e:#}");

if is_error_negative_lookup(&e) {
self.set_error(ProtocolError::InvalidEhloHostname(format!(
"{host}: not found in DNS: {e:#}"
)));

return self
.reply("550", "5.5.0", "EHLO hostname not found in DNS.")
.await;
Expand Down
42 changes: 39 additions & 3 deletions ic-bn-lib/src/smtp/inbound/mail_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
http::dns::is_error_negative_lookup,
network::AsyncReadWrite,
smtp::{
ProtocolError,
address::EmailAddress,
inbound::{Session, SessionResult},
},
Expand All @@ -22,12 +23,18 @@ impl<S: AsyncReadWrite> Session<S> {
/// Handles MAIL FROM command
pub async fn handle_mail_from(&mut self, from: MailFrom<Cow<'_, str>>) -> SessionResult<()> {
let Some(helo_hostname) = self.data.ehlo_hostname.as_ref().map(|x| x.to_string()) else {
self.set_error(ProtocolError::InvalidSequenceOfCommands(
"MAIL FROM before EHLO".into(),
));
return self
.reply("503", "5.5.1", "Polite people say EHLO first.")
.await;
};

if self.data.mail_from.is_some() {
self.set_error(ProtocolError::InvalidSequenceOfCommands(
"Multiple MAIL FROM".into(),
));
return self
.reply(
"503",
Expand Down Expand Up @@ -56,6 +63,10 @@ impl<S: AsyncReadWrite> Session<S> {
}

if from.size > self.cfg.max_message_size {
self.set_error(ProtocolError::MessageTooBig(format!(
"MAIL FROM-specified size is too big: {} > {}",
from.size, self.cfg.max_message_size
)));
return self.message_too_big().await;
}

Expand All @@ -70,6 +81,10 @@ impl<S: AsyncReadWrite> Session<S> {
// Validate address
let Ok(address) = EmailAddress::from_str(&from.address) else {
info!("{self}: {}: incorrect sender address", from.address);
self.set_error(ProtocolError::SenderValidationFailed(format!(
"Incorrect sender address: {}",
from.address
)));
return self
.reply("550", "5.7.1", "Sender address is incorrect.")
.await;
Expand All @@ -88,6 +103,10 @@ impl<S: AsyncReadWrite> Session<S> {
if self.cfg.verify_sender_domain {
if address.domain().depth() < 2 {
info!("{self}: {address}: sender domain verification failed: not FQDN");
self.set_error(ProtocolError::SenderValidationFailed(format!(
"Sender domain is not FQDN: {}",
address.domain()
)));
return self.reply("550", "5.7.2", "Sender must be an FQDN.").await;
};

Expand All @@ -103,6 +122,9 @@ impl<S: AsyncReadWrite> Session<S> {
info!(
"{self}: {address}: sender domain verification failed: no MX records found"
);
self.set_error(ProtocolError::SenderValidationFailed(
"No MX records found".into(),
));
return self
.reply(
"550",
Expand All @@ -117,6 +139,9 @@ impl<S: AsyncReadWrite> Session<S> {
info!(
"{self}: {address}: sender domain verification failed: no MX records found"
);
self.set_error(ProtocolError::SenderValidationFailed(
"No MX records found".into(),
));
return self
.reply(
"550",
Expand All @@ -128,6 +153,9 @@ impl<S: AsyncReadWrite> Session<S> {
info!(
"{self}: {address}: sender domain verification failed: temporary error: {e:#}"
);
self.set_error(ProtocolError::SenderValidationFailed(format!(
"Sender domain verification temporary error: {e:#}",
)));
return self
.reply("451", "4.7.25", "Temporary error validating sender domain.")
.await;
Expand Down Expand Up @@ -157,7 +185,10 @@ impl<S: AsyncReadWrite> Session<S> {
"{self}: {address}: SPF validation failed: temporary error: {:?}",
output.explanation()
);

self.set_error(ProtocolError::SpfValidationFailed(format!(
"SPF validation temporary error: {:?}",
output.explanation()
)));
return self
.reply("451", "4.7.24", "Temporary SPF validation error.")
.await;
Expand All @@ -167,7 +198,10 @@ impl<S: AsyncReadWrite> Session<S> {
"{self}: {address}: SPF validation failed: permanent error: {:?}",
output.explanation()
);

self.set_error(ProtocolError::SpfValidationFailed(format!(
"SPF validation permanent error: {:?}",
output.explanation()
)));
return self
.reply_with("550", "5.7.23", |buf| {
write!(buf, "SPF validation failed")?;
Expand All @@ -191,6 +225,8 @@ impl<S: AsyncReadWrite> Session<S> {

/// Replies about failed reverse IP verification
async fn verify_reverse_ip_reply(&mut self, permanent: bool, msg: &str) -> SessionResult<bool> {
self.set_error(ProtocolError::ReverseIpValidationFailed(msg.into()));

// Emit permanent errors only if in strict mode
if permanent && self.cfg.verify_reverse_ip_strict {
self.reply_with("550", "5.7.25", |buf| {
Expand Down Expand Up @@ -309,7 +345,7 @@ impl<S: AsyncReadWrite> Session<S> {
.await;
}

// Otherwise everything succeeded but no matches found
// Otherwise everything succeeded but no matches were found
info!(
"{self}: reverse IP verification failed: no addresses matching client's IP found after resolving PTR"
);
Expand Down
45 changes: 40 additions & 5 deletions ic-bn-lib/src/smtp/inbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ impl SessionManager {
match session.handle(shutdown_token.child_token()).await {
Ok(v) => match v {
SessionUpgrade::No => {
Self::notify(&session, None).await;
session.stream.shutdown().await.ok();
}
Comment thread
blind-oracle marked this conversation as resolved.

SessionUpgrade::StartTls => {
Self::starttls(session, shutdown_token.child_token()).await
Self::handle_connection_tls(session, shutdown_token.child_token()).await
}
},

Expand All @@ -49,15 +49,20 @@ impl SessionManager {
if let Err(e) = session.shutdown().await {
debug!("{session}: error closing connection: {e:#}");
};

Self::notify(&session, Some(e)).await;
}
}
}

/// Converts session into TLS mode & runs it
async fn starttls<S: AsyncReadWrite>(session: Session<S>, shutdown_token: CancellationToken) {
async fn handle_connection_tls<S: AsyncReadWrite>(
session: Session<S>,
shutdown_token: CancellationToken,
) {
let session_name = session.to_string();
debug!("{session}: starting TLS handshake");

debug!("{session_name}: starting TLS handshake");
match session.into_tls().await {
Ok(mut session) => {
debug!("{session}: TLS handshake succeeded");
Expand All @@ -70,6 +75,8 @@ impl SessionManager {
if let Err(e) = session.shutdown().await {
debug!("{session}: error closing connection: {e:#}");
};

Self::notify(&session, Some(e)).await;
}
}

Expand All @@ -78,6 +85,15 @@ impl SessionManager {
}
};
}

async fn notify<S: AsyncReadWrite>(session: &Session<S>, error: Option<SessionError>) {
if let Some(v) = session.cfg.notifications_handler.clone() {
let meta = session.meta();
tokio::spawn(async move {
v.notify_session_finish(meta, error).await;
});
}
}
}

impl<S: AsyncReadWrite> Session<S> {
Expand All @@ -92,7 +108,26 @@ impl<S: AsyncReadWrite> Session<S> {
}
};

let (stream, tls_info) = tls_handshake(tls_config, self.stream).await?;
let meta = self.meta();
let (stream, tls_info) = match tls_handshake(tls_config, self.stream).await {
Ok(v) => v,
Err(e) => {
let error_str = e.to_string();

// Session is partially consumed by `tls_handshake`, so we can't use `Manager::notify()`
if let Some(v) = self.cfg.notifications_handler.clone() {
tokio::spawn(async move {
v.notify_session_finish(
meta,
Some(SessionError::TlsHandshakeFailed(error_str.clone())),
)
.await;
});
}

return Err(SessionError::TlsHandshakeFailed(e.to_string()));
}
};

Ok(Session {
id: self.id,
Expand Down
Loading
Loading