Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion ic-bn-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod utils;
#[cfg(feature = "vector")]
pub mod vector;

use std::{fs::File, path::Path};
use std::{fs::File, net::IpAddr, path::Path};

use anyhow::{Context, anyhow};
use bytes::Bytes;
Expand Down Expand Up @@ -171,6 +171,28 @@ macro_rules! retry_async {
};
}

/// Returns family of an IP address
pub trait IpFamily {
fn family(&self) -> &'static str;
}

impl IpFamily for IpAddr {
fn family(&self) -> &'static str {
if self.is_ipv4() { "v4" } else { "v6" }
}
}

/// Converts bool to yes/no static str
pub trait BoolYesNo {
fn yesno(&self) -> &'static str;
}

impl BoolYesNo for bool {
fn yesno(&self) -> &'static str {
if *self { "yes" } else { "no" }
}
}

#[macro_export]
macro_rules! dyn_event {
($lvl:ident, $($arg:tt)+) => {
Expand Down
5 changes: 3 additions & 2 deletions ic-bn-lib/src/smtp/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ pub struct SmtpServerCli {
#[clap(env, long, default_value = "5")]
pub smtp_server_max_errors_per_session: usize,

/// Maximum message body size
#[clap(env, long, default_value = "2MB", value_parser = parse_size)]
/// Maximum message body size.
/// Default accounts for max IC message size + some overhead.
#[clap(env, long, default_value = "1950KB", value_parser = parse_size)]
pub smtp_server_max_message_size: u64,

/// How much data can be ingested during a single SMTP session
Expand Down
1 change: 1 addition & 0 deletions ic-bn-lib/src/smtp/ic/candid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct SmtpRequest {
pub message: Option<Message>,
pub envelope: Option<Envelope>,
pub gateway_flags: Option<Vec<String>>,
pub message_id: Option<String>,
Comment thread
blind-oracle marked this conversation as resolved.
}

/// Candid `SmtpRequestError` (`code` is `nat64` on the wire in typical canisters).
Expand Down
137 changes: 117 additions & 20 deletions ic-bn-lib/src/smtp/ic/delivery_agent.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,41 @@
use std::{fmt::Display, str::FromStr, sync::Arc, time::Duration};
use std::{
fmt::Display,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};

use ahash::{AHashMap, RandomState};
use async_trait::async_trait;
use candid::Principal;
use futures::future::try_join_all;
use futures::future::join_all;
use http::Method;
use ic_agent::{Agent, AgentError};
use ic_bn_lib_common::traits::http::Client;
use moka::sync::Cache;
use strum::IntoStaticStr;
use tracing::{debug, info};
use url::Url;

use crate::{
BoolYesNo,
custom_domains::LooksUpCustomDomain,
smtp::{
DeliversMail, DeliveryError, EmailMessage, RecipientPolicy, RecipientResolveError,
ResolvesRecipient,
address::EmailAddress,
ic::{
ExecutesIcSmtpRequest, IcSmtpRequestExecutor,
ExecutesIcSmtpRequest, IcSmtpRequestExecutor, Metrics,
candid::{Envelope, SmtpRequest, SmtpResponse},
parse_email,
},
inbound::SessionMeta,
},
truncate,
};

#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub enum IcSmtpDeliveryAgentError {
#[error("IC Agent error: {0}")]
Agent(#[from] ic_agent::AgentError),
Expand All @@ -43,6 +52,7 @@ pub struct IcSmtpDeliveryAgent {
http_client: Arc<dyn Client>,
ic_base_domain: String,
smtp_canister_id_cache: Cache<Principal, Principal, RandomState>,
metrics: Metrics,
}

impl Display for IcSmtpDeliveryAgent {
Expand All @@ -60,6 +70,7 @@ impl IcSmtpDeliveryAgent {
ic_base_domain: &str,
cache_ttl: Duration,
cache_capacity: u64,
metrics: Metrics,
) -> Self {
let smtp_canister_id_cache = Cache::builder()
.time_to_live(cache_ttl)
Expand All @@ -72,6 +83,7 @@ impl IcSmtpDeliveryAgent {
http_client,
ic_base_domain: ic_base_domain.into(),
smtp_canister_id_cache,
metrics,
}
}

Expand All @@ -83,6 +95,7 @@ impl IcSmtpDeliveryAgent {
ic_base_domain: &str,
cache_ttl: Duration,
cache_capacity: u64,
metrics: Metrics,
) -> Self {
let request_executor = Arc::new(IcSmtpRequestExecutor::new(agent));

Expand All @@ -93,9 +106,39 @@ impl IcSmtpDeliveryAgent {
ic_base_domain,
cache_ttl,
cache_capacity,
metrics,
)
}

fn observe_canister_lookup(
&self,
success: bool,
custom_domain: bool,
smtp_canister: bool,
cached: bool,
elapsed: Duration,
) {
self.metrics
.canister_id_lookups
.with_label_values(&[
success.yesno(),
custom_domain.yesno(),
smtp_canister.yesno(),
cached.yesno(),
])
.inc();

self.metrics
.canister_id_lookup_latency
.with_label_values(&[
success.yesno(),
custom_domain.yesno(),
smtp_canister.yesno(),
cached.yesno(),
])
.observe(elapsed.as_secs_f64());
}

/// Executes an HTTP request to the canister to get the SMTP canister id
async fn lookup_smtp_canister_id(&self, canister_id: Principal) -> Option<Principal> {
let url = Url::parse(&format!(
Expand Down Expand Up @@ -141,22 +184,23 @@ impl IcSmtpDeliveryAgent {
}
Err(e) => {
// Sanitize a bit
let mut body_str = body_str.replace("\r", " ").replace("\n", " ");
body_str.truncate(128);
let body_str = body_str.replace("\r", " ").replace("\n", " ");
let body_str = truncate(&body_str, 128);
info!("{self}: {canister_id}: Incorrect SMTP canister ID: '{body_str}': {e:#}");
None
}
}
}

/// Resolves SMTP canister ID for the given canister_id
async fn resolve_smtp_canister_id(&self, canister_id: Principal) -> Principal {
/// Resolves SMTP canister ID for the given canister_id.
/// Returns also if it was obtained from the cache.
async fn resolve_smtp_canister_id(&self, canister_id: Principal) -> (Principal, bool) {
Comment thread
blind-oracle marked this conversation as resolved.
debug!("{self}: {canister_id}: Looking up SMTP canister ID");

// Try to find SMTP canister ID, check the cache first
if let Some(v) = self.smtp_canister_id_cache.get(&canister_id) {
debug!("{self}: {canister_id}: SMTP canister ID found in cache: {v}");
return v;
return (v, true);
}

// Otherwise do a lookup with a fallback to canister_id
Expand All @@ -173,13 +217,15 @@ impl IcSmtpDeliveryAgent {
.insert(canister_id, smtp_canister_id);

debug!("{self}: {canister_id}: SMTP canister ID obtained: {smtp_canister_id}");
smtp_canister_id
(smtp_canister_id, false)
}

/// Resolves destination SMTP canister id for the given address
async fn resolve_canister_id(&self, address: &EmailAddress) -> Option<Principal> {
debug!("{self}: {address}: resolving SMTP canister ID");
let start = Instant::now();

let mut custom_domain = false;
// First check if the target domain has a canister as 1st label.
// This covers addresses like "foo@qoctq-giaaa-aaaaa-aaaea-cai.icp0.io"
let lbl = address.domain().labels().next()?;
Expand All @@ -194,15 +240,26 @@ impl IcSmtpDeliveryAgent {
.lookup_custom_domain(address.domain())
.inspect(|x| {
debug!("{self}: {address}: found custom domain canister ID: {x}");
custom_domain = true;
})
})
else {
debug!("{self}: {address}: unable to resolve canister ID");
self.observe_canister_lookup(false, false, false, false, start.elapsed());
return None;
};

// Finally check if there's an SMTP canister ID defined
Some(self.resolve_smtp_canister_id(canister_id).await)
let (smtp_canister_id, cached) = self.resolve_smtp_canister_id(canister_id).await;
self.observe_canister_lookup(
true,
custom_domain,
smtp_canister_id != canister_id,
cached,
start.elapsed(),
);

Some(smtp_canister_id)
}

/// Delivers given SMTP request to the canister
Expand Down Expand Up @@ -249,8 +306,11 @@ impl DeliversMail for IcSmtpDeliveryAgent {
message: Arc<EmailMessage>,
) -> Result<(), DeliveryError> {
info!(
"{self}: delivering mail, ehlo: {:?}, from: '{}', to: '{:?}', id '{}'",
meta.ehlo_hostname, message.mail_from, message.rcpt_to, message.id
"{self}: delivering mail, ehlo: {}, from: '{}', to: '{:?}', id '{}'",
meta.ehlo_hostname.unwrap_or_default(),
message.mail_from,
message.rcpt_to,
message.id
);

// A single message can be (potentially) destined for several canisters/domains.
Expand Down Expand Up @@ -288,13 +348,35 @@ impl DeliversMail for IcSmtpDeliveryAgent {
}),
message: Some(parsed.clone()),
gateway_flags: None,
message_id: Some(message.id.to_string()),
};

futs.push(self.smtp_request_deliver(canister_id, ic_smtp_request));
futs.push(async move {
let start = Instant::now();
let res = self
.smtp_request_deliver(canister_id, ic_smtp_request)
.await;

let error_lbl: &'static str = if let Err(e) = &res { e.into() } else { "" };
self.metrics
.smtp_requests
.with_label_values(&["no", error_lbl])
.inc();
self.metrics
.smtp_request_latency
.with_label_values(&["no", error_lbl])
.observe(start.elapsed().as_secs_f64());

res
});
}

try_join_all(futs).await?;
Ok(())
// Find & return 1st error if there's any
join_all(futs)
.await
.into_iter()
.find(|x| x.is_err())
.unwrap_or(Ok(()))
}
}

Expand All @@ -320,9 +402,11 @@ impl ResolvesRecipient for IcSmtpDeliveryAgent {
}),
message: None,
gateway_flags: None,
message_id: None,
};

let ic_smtp_response = self
let start = Instant::now();
let res = self
.request_executor
.canister_request(canister_id, ic_smtp_request, true)
.await
Expand All @@ -338,9 +422,19 @@ impl ResolvesRecipient for IcSmtpDeliveryAgent {
}

_ => RecipientResolveError::Temporary(e.to_string()),
})?;

if let SmtpResponse::Err(e) = ic_smtp_response {
});

let error_lbl: &'static str = if let Err(e) = &res { e.into() } else { "" };
self.metrics
.smtp_requests
.with_label_values(&["yes", error_lbl])
.inc();
self.metrics
.smtp_request_latency
.with_label_values(&["yes", error_lbl])
.observe(start.elapsed().as_secs_f64());

if let SmtpResponse::Err(e) = res? {
info!(
"{self}: {canister_id}: failed to resolve recipient: {} {}",
e.code, e.message
Expand Down Expand Up @@ -385,6 +479,7 @@ mod tests {
use fqdn::{FQDN, fqdn};
use ic_bn_lib_common::principal;
use indoc::indoc;
use prometheus::Registry;
use uuid::Uuid;

#[derive(Debug)]
Expand Down Expand Up @@ -501,6 +596,7 @@ mod tests {
"icp0.io",
Duration::from_secs(10),
10,
Metrics::new(&Registry::new()),
),
http_client,
request_executor,
Expand Down Expand Up @@ -691,6 +787,7 @@ mod tests {
body: body.as_bytes().to_vec(),
}),
gateway_flags: None,
message_id: Some(Uuid::nil().to_string()),
}
};

Expand Down
Loading
Loading