Unit tests part 5

This commit is contained in:
Mauro D 2023-01-22 18:24:24 +00:00
parent 2becb4616d
commit 0ed994675d
42 changed files with 1484 additions and 400 deletions

View file

@ -141,6 +141,7 @@ impl Config {
| EnvelopeKey::Sender
| EnvelopeKey::SenderDomain
| EnvelopeKey::AuthenticatedAs
| EnvelopeKey::Mx
| EnvelopeKey::LocalIp
| EnvelopeKey::RemoteIp,
_,
@ -241,6 +242,7 @@ impl Config {
EnvelopeKey::RemoteIp,
EnvelopeKey::LocalIp,
EnvelopeKey::Priority,
EnvelopeKey::Mx,
];
for rule_name in self.sub_keys("rule") {

View file

@ -25,6 +25,7 @@ use ahash::{AHashMap, AHashSet};
use mail_auth::{
common::crypto::{Ed25519Key, RsaKey, Sha256},
dkim::{Canonicalization, Done},
IpLookupStrategy,
};
use mail_send::Credentials;
use regex::Regex;
@ -343,6 +344,7 @@ pub struct QueueConfig {
pub next_hop: IfBlock<Option<RelayHost>>,
pub max_mx: IfBlock<usize>,
pub max_multihomed: IfBlock<usize>,
pub ip_strategy: IfBlock<IpLookupStrategy>,
pub source_ip: QueueOutboundSourceIp,
pub tls: QueueOutboundTls,
pub dsn: Dsn,

View file

@ -133,6 +133,9 @@ impl Config {
max_multihomed: self
.parse_if_block("queue.outbound.limits.multihomed", ctx, &rcpt_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(2)),
ip_strategy: self
.parse_if_block("queue.outbound.ip-strategy", ctx, &sender_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(IpLookupStrategy::Ipv4thenIpv6)),
source_ip: QueueOutboundSourceIp {
ipv4: self
.parse_if_block("queue.outbound.source-ip.v4", ctx, &mx_envelope_keys)?
@ -141,49 +144,7 @@ impl Config {
.parse_if_block("queue.outbound.source-ip.v6", ctx, &mx_envelope_keys)?
.unwrap_or_else(|| IfBlock::new(Vec::new())),
},
next_hop: IfBlock {
if_then: {
let mut if_then = Vec::with_capacity(next_hop.if_then.len());
for i in next_hop.if_then {
if_then.push(IfThen {
conditions: i.conditions,
then: if let Some(then) = i.then {
Some(
ctx.hosts
.get(&then)
.ok_or_else(|| {
format!(
"Host {:?} not found for property \"queue.next-hop\".",
then
)
})?
.into(),
)
} else {
None
},
});
}
if_then
},
default: if let Some(default) = next_hop.default {
Some(
ctx.hosts
.get(&default)
.ok_or_else(|| {
format!(
"Relay host {:?} not found for property \"queue.next-hop\".",
default
)
})?
.into(),
)
} else {
None
},
},
next_hop: next_hop.into_relay_host(ctx)?,
tls: QueueOutboundTls {
dane: self
.parse_if_block("queue.outbound.tls.dane", ctx, &host_envelope_keys)?
@ -356,6 +317,54 @@ impl Config {
}
}
impl IfBlock<Option<String>> {
pub fn into_relay_host(self, ctx: &ConfigContext) -> super::Result<IfBlock<Option<RelayHost>>> {
Ok(IfBlock {
if_then: {
let mut if_then = Vec::with_capacity(self.if_then.len());
for i in self.if_then {
if_then.push(IfThen {
conditions: i.conditions,
then: if let Some(then) = i.then {
Some(
ctx.hosts
.get(&then)
.ok_or_else(|| {
format!(
"Host {:?} not found for property \"queue.next-hop\".",
then
)
})?
.into(),
)
} else {
None
},
});
}
if_then
},
default: if let Some(default) = self.default {
Some(
ctx.hosts
.get(&default)
.ok_or_else(|| {
format!(
"Relay host {:?} not found for property \"queue.next-hop\".",
default
)
})?
.into(),
)
} else {
None
},
})
}
}
impl From<&Host> for RelayHost {
fn from(host: &Host) -> Self {
RelayHost {

View file

@ -1,10 +1,10 @@
use mail_auth::{
common::lru::{DnsCache, LruCache},
trust_dns_resolver::{
config::{LookupIpStrategy, ResolverConfig, ResolverOpts},
config::{ResolverConfig, ResolverOpts},
system_conf::read_system_conf,
},
Resolver,
IpLookupStrategy, Resolver,
};
use crate::{core::Resolvers, outbound::dane::DnssecResolver};
@ -35,9 +35,7 @@ impl Config {
if let Some(preserve) = self.property("resolver.preserve-intermediates")? {
opts.preserve_intermediates = preserve;
}
if let Some(strategy) = self.property("resolver.strategy")? {
opts.ip_strategy = strategy;
}
if let Some(try_tcp_on_error) = self.property("resolver.try-tcp-on-error")? {
opts.try_tcp_on_error = try_tcp_on_error;
}
@ -79,14 +77,14 @@ impl Config {
}
}
impl ParseValue for LookupIpStrategy {
impl ParseValue for IpLookupStrategy {
fn parse_value(key: impl AsKey, value: &str) -> super::Result<Self> {
Ok(match value.to_lowercase().as_str() {
"ipv4-only" => LookupIpStrategy::Ipv4Only,
"ipv6-only" => LookupIpStrategy::Ipv6Only,
"ipv4-and-ipv6" => LookupIpStrategy::Ipv4AndIpv6,
"ipv6-then-ipv4" => LookupIpStrategy::Ipv6thenIpv4,
"ipv4-then-ipv6" => LookupIpStrategy::Ipv4thenIpv6,
"ipv4-only" => IpLookupStrategy::Ipv4Only,
"ipv6-only" => IpLookupStrategy::Ipv6Only,
//"ipv4-and-ipv6" => IpLookupStrategy::Ipv4AndIpv6,
"ipv6-then-ipv4" => IpLookupStrategy::Ipv6thenIpv4,
"ipv4-then-ipv6" => IpLookupStrategy::Ipv4thenIpv6,
_ => {
return Err(format!(
"Invalid IP lookup strategy {:?} for property {:?}.",

View file

@ -346,7 +346,10 @@ mod tests {
use tokio::net::TcpSocket;
use crate::config::{Config, ConfigContext, Listener, Server, ServerProtocol};
use crate::{
config::{Config, ConfigContext, Listener, Server, ServerProtocol},
tests::add_test_certs,
};
#[test]
fn parse_servers() {
@ -356,19 +359,7 @@ mod tests {
file.push("config");
file.push("servers.toml");
let mut cert_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
cert_path.push("resources");
cert_path.push("tests");
cert_path.push("certs");
let mut cert = cert_path.clone();
cert.push("tls_cert.pem");
let mut pk = cert_path.clone();
pk.push("tls_privatekey.pem");
let toml = fs::read_to_string(file)
.unwrap()
.replace("{CERT}", cert.as_path().to_str().unwrap())
.replace("{PK}", pk.as_path().to_str().unwrap());
let toml = add_test_certs(&fs::read_to_string(file).unwrap());
// Parse servers
let config = Config::parse(&toml).unwrap();

View file

@ -20,8 +20,8 @@ use tracing::Span;
use crate::{
config::{
EnvelopeKey, List, MailAuthConfig, QueueConfig, ReportConfig, Script, ServerProtocol,
SessionConfig, VerifyStrategy,
EnvelopeKey, List, MailAuthConfig, QueueConfig, ReportConfig, Script, SessionConfig,
VerifyStrategy,
},
inbound::auth::SaslToken,
outbound::{
@ -99,7 +99,7 @@ pub enum State {
pub struct ServerInstance {
pub id: String,
pub listener_id: u16,
pub protocol: ServerProtocol,
pub is_smtp: bool,
pub hostname: String,
pub greeting: Vec<u8>,
}

View file

@ -66,7 +66,7 @@ impl IsTls for TlsStream<TcpStream> {
}
.as_bytes(),
);
headers.extend_from_slice(b")\r\n\r");
headers.extend_from_slice(b")\r\n\t");
}
}

View file

@ -7,6 +7,15 @@ use crate::core::{Session, SessionAddress};
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
pub async fn handle_rcpt_to(&mut self, to: RcptTo<String>) -> Result<(), ()> {
#[cfg(test)]
if self.instance.id.ends_with("-debug") {
if to.address.contains("fail@") {
return self.write(b"503 5.5.1 Invalid recipient.\r\n").await;
} else if to.address.contains("delay@") {
return self.write(b"451 4.5.3 Try again later.\r\n").await;
}
}
if self.data.mail_from.is_none() {
return self.write(b"503 5.5.1 MAIL is required first.\r\n").await;
} else if self.data.rcpt_to.len() >= self.params.rcpt_max {

View file

@ -9,10 +9,7 @@ use smtp_proto::{
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use crate::{
config::ServerProtocol,
core::{Envelope, Session, State},
};
use crate::core::{Envelope, Session, State};
use super::{auth::SaslToken, IsTls};
@ -33,7 +30,7 @@ impl<T: AsyncWrite + AsyncRead + IsTls + Unpin> Session<T> {
self.handle_mail_from(from).await?;
}
Request::Ehlo { host } => {
if self.instance.protocol == ServerProtocol::Smtp {
if self.instance.is_smtp {
self.handle_ehlo(host).await?;
} else {
self.write(b"500 5.5.1 Invalid command.\r\n").await?;
@ -134,9 +131,7 @@ impl<T: AsyncWrite + AsyncRead + IsTls + Unpin> Session<T> {
.await?;
}
Request::Helo { host } => {
if self.instance.protocol == ServerProtocol::Smtp
&& self.data.helo_domain.is_empty()
{
if self.instance.is_smtp && self.data.helo_domain.is_empty() {
self.data.helo_domain = host;
self.write(
format!("250 {} says hello\r\n", self.instance.hostname)
@ -148,7 +143,7 @@ impl<T: AsyncWrite + AsyncRead + IsTls + Unpin> Session<T> {
}
}
Request::Lhlo { host } => {
if self.instance.protocol == ServerProtocol::Lmtp {
if !self.instance.is_smtp {
self.handle_ehlo(host).await?;
} else {
self.write(b"502 5.5.1 Invalid command.\r\n").await?;
@ -205,8 +200,15 @@ impl<T: AsyncWrite + AsyncRead + IsTls + Unpin> Session<T> {
State::Data(receiver) => {
if self.data.message.len() + bytes.len() < self.params.max_message_size {
if receiver.ingest(&mut iter, &mut self.data.message) {
let num_rcpts = self.data.rcpt_to.len();
let message = self.queue_message().await;
self.write(message).await?;
if self.instance.is_smtp {
self.write(message).await?;
} else {
for _ in 0..num_rcpts {
self.write(message).await?;
}
}
self.reset();
state = State::default();
} else {
@ -220,8 +222,15 @@ impl<T: AsyncWrite + AsyncRead + IsTls + Unpin> Session<T> {
if receiver.ingest(&mut iter, &mut self.data.message) {
if self.can_send_data().await? {
if receiver.is_last {
let num_rcpts = self.data.rcpt_to.len();
let message = self.queue_message().await;
self.write(message).await?;
if self.instance.is_smtp {
self.write(message).await?;
} else {
for _ in 0..num_rcpts {
self.write(message).await?;
}
}
self.reset();
} else {
self.write(b"250 2.6.0 Chunk accepted.\r\n").await?;

View file

@ -8,7 +8,7 @@ use tokio::{
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use crate::{
config::Server,
config::{Server, ServerProtocol},
core::{Core, ServerInstance, Session, SessionData, SessionParameters, State},
};
@ -25,7 +25,7 @@ impl Server {
greeting: format!("220 {} {}\r\n", self.hostname, self.greeting).into_bytes(),
id: self.id,
listener_id: self.internal_id,
protocol: self.protocol,
is_smtp: self.protocol == ServerProtocol::Smtp,
hostname: self.hostname,
});

View file

@ -37,6 +37,11 @@ impl Resolvers {
return Ok(Some(value));
}
#[cfg(any(test, feature = "test"))]
if true {
return mail_auth::common::resolver::mock_resolve(key.as_ref());
}
let mut entries = Vec::new();
let tlsa_lookup = match self.dnssec.resolver.tlsa_lookup(key.as_ref()).await {
Ok(tlsa_lookup) => tlsa_lookup,

View file

@ -234,7 +234,6 @@ mod test {
}
_ => (),
}
if pos == 0 {}
}
}
r.tlsa_add(hostname, tlsa, Instant::now() + Duration::from_secs(30));
@ -255,7 +254,11 @@ mod test {
}
// Successful DANE verification
let tlsa = r.tlsa_lookup(&host).await.unwrap().unwrap();
let tlsa = r
.tlsa_lookup(format!("_25._tcp.{}.", host))
.await
.unwrap()
.unwrap();
assert_eq!(
tlsa.verify(&tracing::info_span!("test_span"), &host, Some(&certs)),

View file

@ -1,5 +1,4 @@
use std::{
borrow::Cow,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
time::{Duration, Instant},
@ -9,20 +8,21 @@ use mail_auth::{
mta_sts::TlsRpt,
report::tlsrpt::{FailureDetails, ResultType},
};
use mail_send::{Credentials, SmtpClient};
use rand::{seq::SliceRandom, Rng};
use mail_send::SmtpClient;
use smtp_proto::MAIL_REQUIRETLS;
use crate::{
config::{AggregateFrequency, RelayHost, ServerProtocol, TlsStrategy},
config::{AggregateFrequency, TlsStrategy},
core::Core,
queue::ErrorDetails,
reporting::{tls::TlsRptOptions, PolicyType, TlsEvent},
};
use super::{
lookup::ToRemoteHost,
mta_sts,
session::{read_greeting, say_helo, try_start_tls, SessionParams, StartTlsResult},
RemoteHost,
};
use crate::queue::{
manager::Queue, throttle, DeliveryAttempt, Domain, Error, Event, OnHold, QueueEnvelope,
@ -269,49 +269,24 @@ impl DeliveryAttempt {
}
};
if !mx_list.is_empty() {
// Obtain max number of MX hosts to process
let max_mx = *queue_config.max_mx.eval(&envelope).await;
let mut remote_hosts = Vec::with_capacity(max_mx);
for mx in mx_list.iter() {
if mx.exchanges.len() > 1 {
let mut slice = mx.exchanges.iter().collect::<Vec<_>>();
slice.shuffle(&mut rand::thread_rng());
for remote_host in slice {
remote_hosts.push(RemoteHost::MX(remote_host.as_str()));
if remote_hosts.len() == max_mx {
break;
}
}
} else if let Some(remote_host) = mx.exchanges.first() {
// Check for Null MX
if mx.preference == 0 && remote_host == "." {
tracing::info!(
parent: &span,
context = "dns",
event = "null-mx",
reason = "Domain does not accept messages (mull MX)",
);
domain.set_status(
Status::PermanentFailure(Error::DnsError(
"Domain does not accept messages (null MX)".to_string(),
)),
queue_config.retry.eval(&envelope).await,
);
continue 'next_domain;
}
remote_hosts.push(RemoteHost::MX(remote_host.as_str()));
if remote_hosts.len() == max_mx {
break;
}
}
}
if let Some(remote_hosts) = mx_list
.to_remote_hosts(&domain.domain, *queue_config.max_mx.eval(&envelope).await)
{
remote_hosts
} else {
// If an empty list of MXs is returned, the address is treated as if it was
// associated with an implicit MX RR with a preference of 0, pointing to that host.
vec![RemoteHost::MX(domain.domain.as_str())]
tracing::info!(
parent: &span,
context = "dns",
event = "null-mx",
reason = "Domain does not accept messages (mull MX)",
);
domain.set_status(
Status::PermanentFailure(Error::DnsError(
"Domain does not accept messages (null MX)".to_string(),
)),
queue_config.retry.eval(&envelope).await,
);
continue 'next_domain;
}
};
@ -897,7 +872,7 @@ impl DeliveryAttempt {
let mut has_pending_delivery = false;
let span = self.span.clone();
for domain in &mut self.message.domains {
for (idx, domain) in self.message.domains.iter_mut().enumerate() {
match &domain.status {
Status::TemporaryFailure(err) if domain.expires <= now => {
tracing::info!(
@ -907,11 +882,15 @@ impl DeliveryAttempt {
reason = %err,
);
for rcpt in &mut self.message.recipients {
if rcpt.domain_idx == idx {
rcpt.status = std::mem::replace(&mut rcpt.status, Status::Scheduled)
.into_permanent();
}
}
domain.status =
match std::mem::replace(&mut domain.status, Status::Completed(())) {
Status::TemporaryFailure(err) => Status::PermanentFailure(err),
_ => unreachable!(),
};
std::mem::replace(&mut domain.status, Status::Scheduled).into_permanent();
domain.changed = true;
}
Status::Scheduled if domain.expires <= now => {
@ -922,6 +901,13 @@ impl DeliveryAttempt {
reason = "Queue rate limit exceeded.",
);
for rcpt in &mut self.message.recipients {
if rcpt.domain_idx == idx {
rcpt.status = std::mem::replace(&mut rcpt.status, Status::Scheduled)
.into_permanent();
}
}
domain.status = Status::PermanentFailure(Error::Io(
"Queue rate limit exceeded.".to_string(),
));
@ -938,132 +924,6 @@ impl DeliveryAttempt {
}
}
enum RemoteHost<'x> {
Relay(&'x RelayHost),
MX(&'x str),
}
impl<'x> RemoteHost<'x> {
fn hostname(&self) -> &str {
match self {
RemoteHost::MX(host) => host,
RemoteHost::Relay(host) => host.address.as_str(),
}
}
fn fqdn_hostname(&self) -> Cow<'_, str> {
match self {
RemoteHost::MX(host) => {
if !host.ends_with('.') {
format!("{}.", host).into()
} else {
(*host).into()
}
}
RemoteHost::Relay(host) => host.address.as_str().into(),
}
}
fn port(&self) -> u16 {
match self {
RemoteHost::MX(_) => 25,
RemoteHost::Relay(host) => host.port,
}
}
fn credentials(&self) -> Option<&Credentials<String>> {
match self {
RemoteHost::MX(_) => None,
RemoteHost::Relay(host) => host.auth.as_ref(),
}
}
fn allow_invalid_certs(&self) -> bool {
match self {
RemoteHost::MX(_) => false,
RemoteHost::Relay(host) => host.tls_allow_invalid_certs,
}
}
fn implicit_tls(&self) -> bool {
match self {
RemoteHost::MX(_) => false,
RemoteHost::Relay(host) => host.tls_implicit,
}
}
fn is_smtp(&self) -> bool {
match self {
RemoteHost::MX(_) => true,
RemoteHost::Relay(host) => host.protocol == ServerProtocol::Smtp,
}
}
}
impl Core {
async fn resolve_host(
&self,
remote_host: &RemoteHost<'_>,
envelope: &QueueEnvelope<'_>,
max_multihomed: usize,
) -> Result<(Option<IpAddr>, Vec<IpAddr>), Status<(), Error>> {
let mut remote_ips = Vec::new();
let mut source_ip = None;
for (pos, remote_ip) in self
.resolvers
.dns
.ip_lookup(remote_host.fqdn_hostname().as_ref())
.await?
.take(max_multihomed)
.enumerate()
{
if pos == 0 {
if remote_ip.is_ipv4() {
let source_ips = self.queue.config.source_ip.ipv4.eval(envelope).await;
match source_ips.len().cmp(&1) {
std::cmp::Ordering::Equal => {
source_ip = IpAddr::from(*source_ips.first().unwrap()).into();
}
std::cmp::Ordering::Greater => {
source_ip = IpAddr::from(
source_ips[rand::thread_rng().gen_range(0..source_ips.len())],
)
.into();
}
std::cmp::Ordering::Less => (),
}
} else {
let source_ips = self.queue.config.source_ip.ipv6.eval(envelope).await;
match source_ips.len().cmp(&1) {
std::cmp::Ordering::Equal => {
source_ip = IpAddr::from(*source_ips.first().unwrap()).into();
}
std::cmp::Ordering::Greater => {
source_ip = IpAddr::from(
source_ips[rand::thread_rng().gen_range(0..source_ips.len())],
)
.into();
}
std::cmp::Ordering::Less => (),
}
}
}
remote_ips.push(remote_ip);
}
// Make sure there is at least one IP address
if !remote_ips.is_empty() {
Ok((source_ip, remote_ips))
} else {
Err(Status::TemporaryFailure(Error::DnsError(format!(
"No IP addresses found for {:?}.",
envelope.mx
))))
}
}
}
impl Domain {
pub fn set_status(&mut self, status: impl Into<Status<(), Error>>, schedule: &[Duration]) {
self.status = status.into();

239
src/outbound/lookup.rs Normal file
View file

@ -0,0 +1,239 @@
use std::net::IpAddr;
use mail_auth::MX;
use rand::{seq::SliceRandom, Rng};
use crate::{
core::{Core, Envelope},
queue::{Error, ErrorDetails, Status},
};
use super::RemoteHost;
impl Core {
pub(super) async fn resolve_host(
&self,
remote_host: &RemoteHost<'_>,
envelope: &impl Envelope,
max_multihomed: usize,
) -> Result<(Option<IpAddr>, Vec<IpAddr>), Status<(), Error>> {
let remote_ips = self
.resolvers
.dns
.ip_lookup(
remote_host.fqdn_hostname().as_ref(),
*self.queue.config.ip_strategy.eval(envelope).await,
max_multihomed,
)
.await
.map_err(|err| {
if let mail_auth::Error::DnsRecordNotFound(_) = &err {
Status::PermanentFailure(Error::ConnectionError(ErrorDetails {
entity: remote_host.hostname().to_string(),
details: format!("record not found for MX"),
}))
} else {
Status::TemporaryFailure(Error::ConnectionError(ErrorDetails {
entity: remote_host.hostname().to_string(),
details: format!("lookup error: {}", err),
}))
}
})?;
if let Some(remote_ip) = remote_ips.first() {
let mut source_ip = None;
if remote_ip.is_ipv4() {
let source_ips = self.queue.config.source_ip.ipv4.eval(envelope).await;
match source_ips.len().cmp(&1) {
std::cmp::Ordering::Equal => {
source_ip = IpAddr::from(*source_ips.first().unwrap()).into();
}
std::cmp::Ordering::Greater => {
source_ip = IpAddr::from(
source_ips[rand::thread_rng().gen_range(0..source_ips.len())],
)
.into();
}
std::cmp::Ordering::Less => (),
}
} else {
let source_ips = self.queue.config.source_ip.ipv6.eval(envelope).await;
match source_ips.len().cmp(&1) {
std::cmp::Ordering::Equal => {
source_ip = IpAddr::from(*source_ips.first().unwrap()).into();
}
std::cmp::Ordering::Greater => {
source_ip = IpAddr::from(
source_ips[rand::thread_rng().gen_range(0..source_ips.len())],
)
.into();
}
std::cmp::Ordering::Less => (),
}
}
Ok((source_ip, remote_ips))
} else {
Err(Status::TemporaryFailure(Error::DnsError(format!(
"No IP addresses found for {:?}.",
envelope.mx()
))))
}
}
}
pub(super) trait ToRemoteHost {
fn to_remote_hosts<'x, 'y: 'x>(
&'x self,
domain: &'y str,
max_mx: usize,
) -> Option<Vec<RemoteHost<'_>>>;
}
impl ToRemoteHost for Vec<MX> {
fn to_remote_hosts<'x, 'y: 'x>(
&'x self,
domain: &'y str,
max_mx: usize,
) -> Option<Vec<RemoteHost<'_>>> {
if !self.is_empty() {
// Obtain max number of MX hosts to process
let mut remote_hosts = Vec::with_capacity(max_mx);
'outer: for mx in self.iter() {
if mx.exchanges.len() > 1 {
let mut slice = mx.exchanges.iter().collect::<Vec<_>>();
slice.shuffle(&mut rand::thread_rng());
for remote_host in slice {
remote_hosts.push(RemoteHost::MX(remote_host.as_str()));
if remote_hosts.len() == max_mx {
break 'outer;
}
}
} else if let Some(remote_host) = mx.exchanges.first() {
// Check for Null MX
if mx.preference == 0 && remote_host == "." {
return None;
}
remote_hosts.push(RemoteHost::MX(remote_host.as_str()));
if remote_hosts.len() == max_mx {
break;
}
}
}
remote_hosts.into()
} else {
// If an empty list of MXs is returned, the address is treated as if it was
// associated with an implicit MX RR with a preference of 0, pointing to that host.
vec![RemoteHost::MX(domain)].into()
}
}
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use mail_auth::{IpLookupStrategy, MX};
use crate::{config::IfBlock, core::Core, outbound::RemoteHost};
use super::ToRemoteHost;
#[tokio::test]
async fn lookup_ip() {
let ipv6 = vec![
"a:b::1".parse().unwrap(),
"a:b::2".parse().unwrap(),
"a:b::3".parse().unwrap(),
"a:b::4".parse().unwrap(),
];
let ipv4 = vec![
"10.0.0.1".parse().unwrap(),
"10.0.0.2".parse().unwrap(),
"10.0.0.3".parse().unwrap(),
"10.0.0.4".parse().unwrap(),
];
let mut core = Core::test();
core.queue.config.source_ip.ipv4 = IfBlock::new(ipv4.clone());
core.queue.config.source_ip.ipv6 = IfBlock::new(ipv6.clone());
core.resolvers.dns.ipv4_add(
"mx.foobar.org",
vec![
"172.168.0.100".parse().unwrap(),
"172.168.0.101".parse().unwrap(),
],
Instant::now() + Duration::from_secs(10),
);
core.resolvers.dns.ipv6_add(
"mx.foobar.org",
vec!["e:f::a".parse().unwrap(), "e:f::b".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
// Ipv4 strategy
core.queue.config.ip_strategy = IfBlock::new(IpLookupStrategy::Ipv4thenIpv6);
let (source_ips, remote_ips) = core
.resolve_host(&RemoteHost::MX("mx.foobar.org"), &"envelope", 2)
.await
.unwrap();
assert!(ipv4.contains(&match source_ips.unwrap() {
std::net::IpAddr::V4(v4) => v4,
_ => unreachable!(),
}));
assert!(remote_ips.contains(&"172.168.0.100".parse().unwrap()));
// Ipv6 strategy
core.queue.config.ip_strategy = IfBlock::new(IpLookupStrategy::Ipv6thenIpv4);
let (source_ips, remote_ips) = core
.resolve_host(&RemoteHost::MX("mx.foobar.org"), &"envelope", 2)
.await
.unwrap();
assert!(ipv6.contains(&match source_ips.unwrap() {
std::net::IpAddr::V6(v6) => v6,
_ => unreachable!(),
}));
assert!(remote_ips.contains(&"e:f::a".parse().unwrap()));
}
#[test]
fn to_remote_hosts() {
let mx = vec![
MX {
exchanges: vec!["mx1".to_string(), "mx2".to_string()],
preference: 10,
},
MX {
exchanges: vec![
"mx3".to_string(),
"mx4".to_string(),
"mx5".to_string(),
"mx6".to_string(),
],
preference: 20,
},
MX {
exchanges: vec!["mx7".to_string(), "mx8".to_string()],
preference: 10,
},
MX {
exchanges: vec!["mx9".to_string(), "mxA".to_string()],
preference: 10,
},
];
let hosts = mx.to_remote_hosts("domain", 7).unwrap();
assert_eq!(hosts.len(), 7);
for host in hosts {
if let RemoteHost::MX(host) = host {
println!("host {}", host);
assert!((*host.as_bytes().last().unwrap() - b'0') <= 8);
}
}
let mx = vec![MX {
exchanges: vec![".".to_string()],
preference: 0,
}];
assert!(mx.to_remote_hosts("domain", 10).is_none());
}
}

View file

@ -1,9 +1,16 @@
use std::borrow::Cow;
use mail_send::Credentials;
use smtp_proto::{Response, Severity};
use crate::queue::{DeliveryAttempt, Error, ErrorDetails, HostResponse, Message, Status};
use crate::{
config::{RelayHost, ServerProtocol},
queue::{DeliveryAttempt, Error, ErrorDetails, HostResponse, Message, Status},
};
pub mod dane;
pub mod delivery;
pub mod lookup;
pub mod mta_sts;
pub mod session;
@ -190,3 +197,73 @@ impl From<Box<Message>> for DeliveryAttempt {
}
}
}
enum RemoteHost<'x> {
Relay(&'x RelayHost),
MX(&'x str),
}
impl<'x> RemoteHost<'x> {
fn hostname(&self) -> &str {
match self {
RemoteHost::MX(host) => host,
RemoteHost::Relay(host) => host.address.as_str(),
}
}
fn fqdn_hostname(&self) -> Cow<'_, str> {
match self {
RemoteHost::MX(host) => {
if !host.ends_with('.') {
format!("{}.", host).into()
} else {
(*host).into()
}
}
RemoteHost::Relay(host) => host.address.as_str().into(),
}
}
fn port(&self) -> u16 {
match self {
#[cfg(test)]
RemoteHost::MX(_) => 9925,
#[cfg(not(test))]
RemoteHost::MX(_) => 25,
RemoteHost::Relay(host) => host.port,
}
}
fn credentials(&self) -> Option<&Credentials<String>> {
match self {
RemoteHost::MX(_) => None,
RemoteHost::Relay(host) => host.auth.as_ref(),
}
}
fn allow_invalid_certs(&self) -> bool {
#[cfg(test)]
{
true
}
#[cfg(not(test))]
match self {
RemoteHost::MX(_) => false,
RemoteHost::Relay(host) => host.tls_allow_invalid_certs,
}
}
fn implicit_tls(&self) -> bool {
match self {
RemoteHost::MX(_) => false,
RemoteHost::Relay(host) => host.tls_implicit,
}
}
fn is_smtp(&self) -> bool {
match self {
RemoteHost::MX(_) => true,
RemoteHost::Relay(host) => host.protocol == ServerProtocol::Smtp,
}
}
}

View file

@ -82,7 +82,7 @@ max_age: 604800",
mode: Mode::Enforce,
mx: vec![
MxPattern::Equals("mail.example.com".to_string()),
MxPattern::StartsWith(".example.net".to_string()),
MxPattern::StartsWith("example.net".to_string()),
MxPattern::Equals("backupmx.example.com".to_string()),
],
max_age: 604800,
@ -100,7 +100,7 @@ max_age: 86400
mode: Mode::Testing,
mx: vec![
MxPattern::Equals("gmail-smtp-in.l.google.com".to_string()),
MxPattern::StartsWith(".gmail-smtp-in.l.google.com".to_string()),
MxPattern::StartsWith("gmail-smtp-in.l.google.com".to_string()),
],
max_age: 86400,
},

View file

@ -145,7 +145,7 @@ impl Message {
let response = HostResponse {
hostname: ErrorDetails {
entity: params.hostname.to_string(),
details: cmd,
details: cmd.trim().to_string(),
},
response,
};
@ -354,7 +354,7 @@ impl Message {
fn build_rcpt_to(&self, rcpt: &Recipient, capabilities: &EhloResponse<String>) -> String {
let mut rcpt_to = String::with_capacity(rcpt.address.len() + 60);
let _ = write!(rcpt_to, "RCPT TO:<{}>", self.return_path);
let _ = write!(rcpt_to, "RCPT TO:<{}>", rcpt.address);
if capabilities.has_capability(EXT_DSN) {
if rcpt.has_flag(RCPT_NOTIFY_SUCCESS | RCPT_NOTIFY_FAILURE | RCPT_NOTIFY_DELAY) {
rcpt_to.push_str(" NOTIFY=");
@ -473,11 +473,7 @@ pub async fn read_data_responses<T: AsyncRead + AsyncWrite + Unpin>(
num_responses: usize,
) -> Result<Vec<Response<String>>, Status<(), Error>> {
tokio::time::timeout(smtp_client.timeout, async {
let mut responses = Vec::with_capacity(num_responses);
for _ in 0..num_responses {
responses.push(smtp_client.read().await?);
}
Ok(responses)
smtp_client.read_many(num_responses).await
})
.await
.map_err(|_| Status::timeout(hostname, "reading DATA"))?
@ -490,11 +486,22 @@ pub async fn send_message<T: AsyncRead + AsyncWrite + Unpin>(
bdat_cmd: &Option<String>,
params: &SessionParams<'_>,
) -> Result<(), Status<(), Error>> {
let raw_message = fs::read(&message.path).await.map_err(|err| {
let mut raw_message = vec![0u8; message.size];
let mut file = fs::File::open(&message.path).await.map_err(|err| {
tracing::error!(parent: params.span,
context = "queue",
event = "error",
"Failed to read message file {} from disk: {}",
"Failed to open message file {}: {}",
message.path.display(),
err);
Status::TemporaryFailure(Error::Io("Queue system error.".to_string()))
})?;
file.read_exact(&mut raw_message).await.map_err(|err| {
tracing::error!(parent: params.span,
context = "queue",
event = "error",
"Failed to read {} bytes file {} from disk: {}",
message.size,
message.path.display(),
err);
Status::TemporaryFailure(Error::Io("Queue system error.".to_string()))

View file

@ -202,7 +202,11 @@ impl DeliveryAttempt {
if has_delay {
let mut domains = std::mem::take(&mut self.message.domains);
for domain in &mut domains {
if domain.notify.due <= now {
if matches!(
&domain.status,
Status::TemporaryFailure(_) | Status::Scheduled
) && domain.notify.due <= now
{
let envelope = SimpleEnvelope::new(&self.message, &domain.domain);
if let Some(next_notify) = config
@ -497,6 +501,13 @@ impl Domain {
}
impl<T, E> Status<T, E> {
pub fn into_permanent(self) -> Self {
match self {
Status::TemporaryFailure(v) => Status::PermanentFailure(v),
v => v,
}
}
fn write_dsn_action(&self, dsn: &mut String) {
dsn.push_str("Action: ");
dsn.push_str(match self {

View file

@ -1,14 +1,11 @@
use std::sync::Arc;
use ahash::AHashSet;
use tokio::sync::mpsc;
use crate::{
config::{ConfigContext, IfBlock, List},
core::{Core, Session, SessionAddress},
tests::{
inbound::read_queue,
make_temp_dir,
session::{load_test_message, DummyIo, VerifyResponse},
ParseTestConfig,
},
@ -19,12 +16,7 @@ async fn data() {
let mut core = Core::test();
// Create temp dir for queue
let temp_dir = make_temp_dir("smtp_data_test", true);
core.queue.config.path = IfBlock::new(temp_dir.temp_dir.clone());
// Create queue receiver
let (queue_tx, mut queue_rx) = mpsc::channel(128);
core.queue.tx = queue_tx;
let mut qr = core.init_test_queue("smtp_data_test");
let mut config = &mut core.session.config.rcpt;
config.lookup_domains = IfBlock::new(Some(Arc::new(List::Local(AHashSet::from_iter([
@ -83,20 +75,30 @@ async fn data() {
// Send broken message
session
.send_message("john@doe.org", "bill@foobar.org", "From: john", "550 5.7.7")
.send_message(
"john@doe.org",
&["bill@foobar.org"],
"From: john",
"550 5.7.7",
)
.await;
// Naive Loop detection
session
.send_message("john@doe.org", "bill@foobar.org", "test:loop", "450 4.4.6")
.send_message(
"john@doe.org",
&["bill@foobar.org"],
"test:loop",
"450 4.4.6",
)
.await;
// No headers should be added to messages from 10.0.0.1
session
.send_message("john@doe.org", "bill@foobar.org", "test:no_msgid", "250")
.send_message("john@doe.org", &["bill@foobar.org"], "test:no_msgid", "250")
.await;
assert_eq!(
read_queue(&mut queue_rx).await.inner.read_message(),
qr.read_event().await.unwrap_message().read_message(),
load_test_message("no_msgid")
);
@ -111,11 +113,11 @@ async fn data() {
session.data.remote_ip = "10.0.0.3".parse().unwrap();
session.eval_session_params().await;
session
.send_message("john@doe.org", "mike@test.com", "test:no_msgid", "250")
.send_message("john@doe.org", &["mike@test.com"], "test:no_msgid", "250")
.await;
read_queue(&mut queue_rx)
qr.read_event()
.await
.inner
.unwrap_message()
.read_lines()
.assert_contains("From: ")
.assert_contains("To: ")
@ -132,13 +134,13 @@ async fn data() {
session.data.remote_ip = "10.0.0.2".parse().unwrap();
session.eval_session_params().await;
session
.send_message("john@doe.org", "bill@foobar.org", "test:no_dkim", "250")
.send_message("john@doe.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
queued_messages.push(read_queue(&mut queue_rx).await);
queued_messages.push(qr.read_event().await);
session
.send_message(
"john@doe.org",
"bill@foobar.org",
&["bill@foobar.org"],
"test:no_dkim",
"452 4.3.1",
)
@ -149,13 +151,18 @@ async fn data() {
// Only 1500 bytes are allowed in the queue to domain foobar.org
session
.send_message("jane@foobar.org", "bill@foobar.org", "test:no_dkim", "250")
.send_message(
"jane@foobar.org",
&["bill@foobar.org"],
"test:no_dkim",
"250",
)
.await;
queued_messages.push(read_queue(&mut queue_rx).await);
queued_messages.push(qr.read_event().await);
session
.send_message(
"jane@foobar.org",
"bill@foobar.org",
&["bill@foobar.org"],
"test:no_dkim",
"452 4.3.1",
)
@ -163,13 +170,18 @@ async fn data() {
// Only 1500 bytes are allowed in the queue to recipient jane@domain.net
session
.send_message("jane@foobar.org", "jane@domain.net", "test:no_dkim", "250")
.send_message(
"jane@foobar.org",
&["jane@domain.net"],
"test:no_dkim",
"250",
)
.await;
queued_messages.push(read_queue(&mut queue_rx).await);
queued_messages.push(qr.read_event().await);
session
.send_message(
"jane@foobar.org",
"jane@domain.net",
&["jane@domain.net"],
"test:no_dkim",
"452 4.3.1",
)

View file

@ -16,12 +16,7 @@ use tokio::sync::mpsc;
use crate::{
config::{AggregateFrequency, ConfigContext, IfBlock, List, Rate, VerifyStrategy},
core::{Core, Session},
tests::{
inbound::{assert_empty_queue, read_dmarc_report, read_queue},
make_temp_dir,
session::VerifyResponse,
ParseTestConfig,
},
tests::{inbound::read_dmarc_report, session::VerifyResponse, ParseTestConfig},
};
#[tokio::test]
@ -30,8 +25,7 @@ async fn dmarc() {
let ctx = ConfigContext::default().parse_signatures();
// Create temp dir for queue
let temp_dir = make_temp_dir("smtp_dmarc_test", true);
core.queue.config.path = IfBlock::new(temp_dir.temp_dir.clone());
let mut qr = core.init_test_queue("smtp_dmarc_test");
// Add SPF, DKIM and DMARC records
core.resolvers.dns.txt_add(
@ -95,10 +89,8 @@ async fn dmarc() {
Instant::now() + Duration::from_secs(5),
);
// Create queue and report channels
let (queue_tx, mut queue_rx) = mpsc::channel(128);
// Create report channels
let (report_tx, mut report_rx) = mpsc::channel(128);
core.queue.tx = queue_tx;
core.report.tx = report_tx;
let mut config = &mut core.session.config.rcpt;
@ -149,7 +141,7 @@ async fn dmarc() {
session.mail_from("bill@example.com", "550 5.7.23").await;
// Expect SPF auth failure report
let message = read_queue(&mut queue_rx).await.inner;
let message = qr.read_event().await.unwrap_message();
assert_eq!(
message.recipients.last().unwrap().address,
"spf-failures@example.com"
@ -163,7 +155,7 @@ async fn dmarc() {
// Second DKIM failure report should be rate limited
session.mail_from("bill@example.com", "550 5.7.23").await;
assert_empty_queue(&mut queue_rx);
qr.assert_empty_queue();
// Invalid DKIM signatures should be rejected
session.data.remote_ip = "10.0.0.1".parse().unwrap();
@ -171,14 +163,14 @@ async fn dmarc() {
session
.send_message(
"bill@example.com",
"jdoe@example.com",
&["jdoe@example.com"],
"test:invalid_dkim",
"550 5.7.20",
)
.await;
// Expect DKIM auth failure report
let message = read_queue(&mut queue_rx).await.inner;
let message = qr.read_event().await.unwrap_message();
assert_eq!(
message.recipients.last().unwrap().address,
"dkim-failures@example.com"
@ -194,36 +186,36 @@ async fn dmarc() {
session
.send_message(
"bill@example.com",
"jdoe@example.com",
&["jdoe@example.com"],
"test:invalid_dkim",
"550 5.7.20",
)
.await;
assert_empty_queue(&mut queue_rx);
qr.assert_empty_queue();
// Invalid ARC should be rejected
session
.send_message(
"bill@example.com",
"jdoe@example.com",
&["jdoe@example.com"],
"test:invalid_arc",
"550 5.7.29",
)
.await;
assert_empty_queue(&mut queue_rx);
qr.assert_empty_queue();
// Unaligned DMARC should be rejected
session
.send_message(
"joe@foobar.com",
"jdoe@example.com",
&["jdoe@example.com"],
"test:dkim",
"550 5.7.1",
)
.await;
// Expect DMARC auth failure report
let message = read_queue(&mut queue_rx).await.inner;
let message = qr.read_event().await.unwrap_message();
assert_eq!(
message.recipients.last().unwrap().address,
"dmarc-failures@example.com"
@ -247,20 +239,25 @@ async fn dmarc() {
session
.send_message(
"joe@foobar.com",
"jdoe@example.com",
&["jdoe@example.com"],
"test:dkim",
"550 5.7.1",
)
.await;
assert_empty_queue(&mut queue_rx);
qr.assert_empty_queue();
// Messagess passing DMARC should be accepted
session
.send_message("bill@example.com", "jdoe@example.com", "test:dkim", "250")
.send_message(
"bill@example.com",
&["jdoe@example.com"],
"test:dkim",
"250",
)
.await;
read_queue(&mut queue_rx)
qr.read_event()
.await
.inner
.unwrap_message()
.read_lines()
.assert_contains("dkim=pass")
.assert_contains("spf=pass")

View file

@ -3,10 +3,12 @@ use std::time::Duration;
use tokio::sync::mpsc::{self, error::TryRecvError};
use crate::{
queue::{self, Message, Schedule},
queue::{self, Message, Schedule, WorkerResult},
reporting::{self, DmarcEvent, TlsEvent},
};
use super::QueueReceiver;
pub mod auth;
pub mod basic;
pub mod data;
@ -19,22 +21,64 @@ pub mod sign;
pub mod throttle;
pub mod vrfy;
pub async fn read_queue(rx: &mut mpsc::Receiver<queue::Event>) -> Schedule<Box<Message>> {
match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await {
Ok(Some(event)) => match event {
queue::Event::Queue(message) => message,
_ => panic!("Unexpected event."),
},
Ok(None) => panic!("Channel closed."),
Err(_) => panic!("No queue event received."),
impl QueueReceiver {
pub async fn read_event(&mut self) -> queue::Event {
match tokio::time::timeout(Duration::from_millis(100), self.queue_rx.recv()).await {
Ok(Some(event)) => event,
Ok(None) => panic!("Channel closed."),
Err(_) => panic!("No queue event received."),
}
}
pub async fn try_read_event(&mut self) -> Option<queue::Event> {
match tokio::time::timeout(Duration::from_millis(100), self.queue_rx.recv()).await {
Ok(Some(event)) => Some(event),
Ok(None) => panic!("Channel closed."),
Err(_) => None,
}
}
pub fn assert_empty_queue(&mut self) {
match self.queue_rx.try_recv() {
Err(TryRecvError::Empty) => (),
Ok(event) => panic!("Expected empty queue but got {:?}", event),
Err(err) => panic!("Queue error: {:?}", err),
}
}
}
pub fn assert_empty_queue(rx: &mut mpsc::Receiver<queue::Event>) {
match rx.try_recv() {
Err(TryRecvError::Empty) => (),
Ok(event) => panic!("Expected empty queue but got {:?}", event),
Err(err) => panic!("Queue error: {:?}", err),
impl queue::Event {
pub fn unwrap_message(self) -> Box<Message> {
match self {
queue::Event::Queue(message) => message.inner,
e => panic!("Unexpected event: {:?}", e),
}
}
pub fn unwrap_schedule(self) -> Schedule<Box<Message>> {
match self {
queue::Event::Queue(message) => message,
e => panic!("Unexpected event: {:?}", e),
}
}
pub fn unwrap_result(self) -> WorkerResult {
match self {
queue::Event::Done(result) => result,
queue::Event::Queue(message) => {
panic!("Unexpected message: {}", message.inner.read_message());
}
e => panic!("Unexpected event: {:?}", e),
}
}
pub fn unwrap_done(self) {
match self {
queue::Event::Done(WorkerResult::Done) => (),
queue::Event::Queue(message) => {
panic!("Unexpected message: {}", message.inner.read_message());
}
e => panic!("Unexpected event: {:?}", e),
}
}
}
@ -61,14 +105,14 @@ pub async fn read_tls_report(rx: &mut mpsc::Receiver<reporting::Event>) -> Box<T
}
impl Message {
fn read_message(&self) -> String {
pub fn read_message(&self) -> String {
let mut buf = vec![0u8; self.size];
let mut file = std::fs::File::open(&self.path).unwrap();
std::io::Read::read_exact(&mut file, &mut buf).unwrap();
String::from_utf8(buf).unwrap()
}
fn read_lines(&self) -> Vec<String> {
pub fn read_lines(&self) -> Vec<String> {
self.read_message()
.split('\n')
.map(|l| l.to_string())

View file

@ -8,12 +8,11 @@ use mail_auth::{
common::{parse::TxtRecordParser, verify::DomainKey},
spf::Spf,
};
use tokio::sync::mpsc;
use crate::{
config::{Config, ConfigContext, IfBlock, List, VerifyStrategy},
core::{Core, Session},
tests::{inbound::read_queue, make_temp_dir, session::VerifyResponse, ParseTestConfig},
tests::{session::VerifyResponse, ParseTestConfig},
};
const SIGNATURES: &str = "
@ -59,8 +58,7 @@ async fn sign_and_seal() {
let mut core = Core::test();
// Create temp dir for queue
let temp_dir = make_temp_dir("smtp_sign_test", true);
core.queue.config.path = IfBlock::new(temp_dir.temp_dir.clone());
let mut qr = core.init_test_queue("smtp_sign_test");
// Add SPF, DKIM and DMARC records
core.resolvers.dns.txt_add(
@ -101,10 +99,6 @@ async fn sign_and_seal() {
Instant::now() + Duration::from_secs(5),
);
// Create queue and report channels
let (queue_tx, mut queue_rx) = mpsc::channel(128);
core.queue.tx = queue_tx;
let mut config = &mut core.session.config.rcpt;
config.lookup_domains = IfBlock::new(Some(Arc::new(List::Local(AHashSet::from_iter([
"example.com".to_string(),
@ -143,11 +137,16 @@ async fn sign_and_seal() {
session.eval_session_params().await;
session.ehlo("mx.example.com").await;
session
.send_message("bill@foobar.org", "jdoe@example.com", "test:no_dkim", "250")
.send_message(
"bill@foobar.org",
&["jdoe@example.com"],
"test:no_dkim",
"250",
)
.await;
read_queue(&mut queue_rx)
qr.read_event()
.await
.inner
.unwrap_message()
.read_lines()
.assert_contains(
"DKIM-Signature: v=1; a=rsa-sha256; s=rsa; d=example.com; c=simple/relaxed;",
@ -155,11 +154,11 @@ async fn sign_and_seal() {
// Test ARC verify and seal
session
.send_message("bill@foobar.org", "jdoe@example.com", "test:arc", "250")
.send_message("bill@foobar.org", &["jdoe@example.com"], "test:arc", "250")
.await;
read_queue(&mut queue_rx)
qr.read_event()
.await
.inner
.unwrap_message()
.read_lines()
.assert_contains("ARC-Seal: i=3; a=ed25519-sha256; s=ed; d=example.com; cv=pass;")
.assert_contains(

View file

@ -4,7 +4,7 @@ use dashmap::DashMap;
use mail_auth::{
common::lru::{DnsCache, LruCache},
trust_dns_resolver::config::{ResolverConfig, ResolverOpts},
Resolver,
IpLookupStrategy, Resolver,
};
use mail_send::smtp::tls::build_tls_connector;
use smtp_proto::{AUTH_LOGIN, AUTH_PLAIN};
@ -238,6 +238,7 @@ impl QueueConfig {
ipv4: IfBlock::new(vec![]),
ipv6: IfBlock::new(vec![]),
},
ip_strategy: IfBlock::new(IpLookupStrategy::Ipv4thenIpv6),
tls: QueueOutboundTls {
dane: IfBlock::new(crate::config::RequireOptional::Optional),
mta_sts: IfBlock::new(crate::config::RequireOptional::Optional),
@ -378,3 +379,38 @@ impl Drop for TempDir {
}
}
}
pub fn add_test_certs(config: &str) -> String {
let mut cert_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
cert_path.push("resources");
cert_path.push("tests");
cert_path.push("certs");
let mut cert = cert_path.clone();
cert.push("tls_cert.pem");
let mut pk = cert_path.clone();
pk.push("tls_privatekey.pem");
config
.replace("{CERT}", cert.as_path().to_str().unwrap())
.replace("{PK}", pk.as_path().to_str().unwrap())
}
pub struct QueueReceiver {
_temp_dir: TempDir,
pub queue_rx: mpsc::Receiver<crate::queue::Event>,
}
impl Core {
pub fn init_test_queue(&mut self, test_name: &str) -> QueueReceiver {
let _temp_dir = make_temp_dir(test_name, true);
self.queue.config.path = IfBlock::new(_temp_dir.temp_dir.clone());
let (queue_tx, queue_rx) = mpsc::channel(128);
self.queue.tx = queue_tx;
QueueReceiver {
_temp_dir,
queue_rx,
}
}
}

View file

@ -0,0 +1,126 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use mail_auth::MX;
use smtp_proto::{MAIL_REQUIRETLS, MAIL_RET_HDRS, MAIL_SMTPUTF8, RCPT_NOTIFY_NEVER};
use crate::{
config::IfBlock,
core::{Core, Session},
queue::{manager::Queue, DeliveryAttempt},
tests::{outbound::start_test_server, session::VerifyResponse},
};
#[tokio::test]
async fn extensions() {
/*tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();*/
// Start test server
let mut core = Core::test();
core.session.config.rcpt.relay = IfBlock::new(true);
core.session.config.data.max_message_size = IfBlock::new(1500);
core.session.config.extensions.dsn = IfBlock::new(true);
core.session.config.extensions.requiretls = IfBlock::new(true);
let mut remote_qr = core.init_test_queue("smtp_ext_remote");
let _rx = start_test_server(core.into(), true);
// Add mock DNS entries
let mut core = Core::test();
core.resolvers.dns.mx_add(
"foobar.org",
vec![MX {
exchanges: vec!["mx.foobar.org".to_string()],
preference: 10,
}],
Instant::now() + Duration::from_secs(10),
);
core.resolvers.dns.ipv4_add(
"mx.foobar.org",
vec!["127.0.0.1".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
// Successful delivery with DSN
let mut local_qr = core.init_test_queue("smtp_ext_local");
core.session.config.rcpt.relay = IfBlock::new(true);
core.session.config.extensions.dsn = IfBlock::new(true);
let core = Arc::new(core);
let mut queue = Queue::default();
let mut session = Session::test(core.clone());
session.data.remote_ip = "10.0.0.1".parse().unwrap();
session.eval_session_params().await;
session.ehlo("mx.test.org").await;
session
.send_message(
"john@test.org",
&["<bill@foobar.org> NOTIFY=SUCCESS,FAILURE"],
"test:no_dkim",
"250",
)
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
.await;
local_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.assert_contains("<bill@foobar.org> (delivered to")
.assert_contains("Final-Recipient: rfc822;bill@foobar.org")
.assert_contains("Action: delivered");
local_qr.read_event().await.unwrap_done();
remote_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.assert_contains("using TLSv1.3 with cipher");
// Test SIZE extension
session
.send_message("john@test.org", &["bill@foobar.org"], "test:arc", "250")
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
.await;
local_qr
.read_event()
.await
.unwrap_message()
.read_lines()
.assert_contains("<bill@foobar.org> (host 'mx.foobar.org' rejected command 'MAIL FROM:")
.assert_contains("Action: failed")
.assert_contains("Diagnostic-Code: smtp;552")
.assert_contains("Status: 5.3.4");
local_qr.read_event().await.unwrap_done();
remote_qr.assert_empty_queue();
// Test DSN, SMTPUTF8 and REQUIRETLS extensions
session
.send_message(
"<john@test.org> ENVID=abc123 RET=HDRS REQUIRETLS SMTPUTF8",
&["<bill@foobar.org> NOTIFY=NEVER"],
"test:no_dkim",
"250",
)
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
.await;
local_qr.read_event().await.unwrap_done();
let message = remote_qr.read_event().await.unwrap_message();
assert_eq!(message.env_id, Some("abc123".to_string()));
assert!((message.flags & MAIL_RET_HDRS) != 0);
assert!((message.flags & MAIL_REQUIRETLS) != 0);
assert!((message.flags & MAIL_SMTPUTF8) != 0);
assert!((message.recipients.last().unwrap().flags & RCPT_NOTIFY_NEVER) != 0);
}

170
src/tests/outbound/lmtp.rs Normal file
View file

@ -0,0 +1,170 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use crate::{
config::{Config, ConfigContext, IfBlock},
core::{Core, Session},
queue::{manager::Queue, DeliveryAttempt, Event, WorkerResult},
tests::{outbound::start_test_server, session::VerifyResponse, ParseTestConfig},
};
const REMOTE: &str = "
[remote.lmtp]
address = lmtp.foobar.org
port = 9924
protocol = 'lmtp'
concurrency = 5
[remote.lmtp.tls]
implicit = true
allow-invalid-certs = true
";
#[tokio::test]
async fn lmtp_delivery() {
/*tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();*/
// Start test server
let mut core = Core::test();
core.session.config.rcpt.relay = IfBlock::new(true);
core.session.config.extensions.dsn = IfBlock::new(true);
let mut remote_qr = core.init_test_queue("smtp_delivery_remote");
let _rx = start_test_server(core.into(), false);
// Add mock DNS entries
let mut core = Core::test();
core.resolvers.dns.ipv4_add(
"lmtp.foobar.org",
vec!["127.0.0.1".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
// Multiple delivery attempts
let mut local_qr = core.init_test_queue("smtp_delivery_local");
let mut ctx = ConfigContext::default();
let config = Config::parse(REMOTE).unwrap();
config.parse_remote_hosts(&mut ctx).unwrap();
core.queue.config.next_hop = "[{if = 'rcpt-domain', eq = 'foobar.org', then = 'lmtp'},
{else = false}]"
.parse_if::<Option<String>>(&ctx)
.into_relay_host(&ctx)
.unwrap();
core.session.config.rcpt.relay = IfBlock::new(true);
core.session.config.rcpt.max_recipients = IfBlock::new(100);
core.session.config.extensions.dsn = IfBlock::new(true);
let mut config = &mut core.queue.config;
config.retry = IfBlock::new(vec![Duration::from_millis(100)]);
config.notify = "[{if = 'rcpt-domain', eq = 'foobar.org', then = ['100ms', '200ms']},
{else = ['100ms']}]"
.parse_if(&ctx);
config.expire = "[{if = 'rcpt-domain', eq = 'foobar.org', then = '400ms'},
{else = '500ms'}]"
.parse_if(&ctx);
config.timeout.data = IfBlock::new(Duration::from_millis(50));
let core = Arc::new(core);
let mut queue = Queue::default();
let mut session = Session::test(core.clone());
session.data.remote_ip = "10.0.0.1".parse().unwrap();
session.eval_session_params().await;
session.ehlo("mx.test.org").await;
session
.send_message(
"john@test.org",
&[
"<bill@foobar.org> NOTIFY=SUCCESS,DELAY,FAILURE",
"<jane@foobar.org> NOTIFY=SUCCESS,DELAY,FAILURE",
"<john@foobar.org> NOTIFY=SUCCESS,DELAY,FAILURE",
"<delay@foobar.org> NOTIFY=SUCCESS,DELAY,FAILURE",
"<fail@foobar.org> NOTIFY=SUCCESS,DELAY,FAILURE",
"<invalid@domain.org> NOTIFY=SUCCESS,DELAY,FAILURE",
],
"test:no_dkim",
"250",
)
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
.await;
let mut dsn = Vec::new();
loop {
match local_qr.try_read_event().await {
Some(Event::Queue(message)) => {
dsn.push(message.inner);
}
Some(Event::Done(wr)) => match wr {
WorkerResult::Done => {
break;
}
WorkerResult::Retry(retry) => {
queue.main.push(retry);
}
WorkerResult::OnHold(_) => unreachable!(),
},
None | Some(Event::Stop) => break,
}
if !queue.main.is_empty() {
tokio::time::sleep(queue.wake_up_time()).await;
DeliveryAttempt::from(queue.next_due().unwrap())
.try_deliver(core.clone(), &mut queue)
.await;
}
}
assert!(queue.main.is_empty());
assert_eq!(dsn.len(), 4);
let mut dsn = dsn.into_iter();
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<bill@foobar.org> (delivered to")
.assert_contains("<jane@foobar.org> (delivered to")
.assert_contains("<john@foobar.org> (delivered to")
.assert_contains("<invalid@domain.org> (failed to lookup")
.assert_contains("<fail@foobar.org> (host 'lmtp.foobar.org' rejected command");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<delay@foobar.org> (host 'lmtp.foobar.org' rejected")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<delay@foobar.org> (host 'lmtp.foobar.org' rejected")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<delay@foobar.org> (host 'lmtp.foobar.org' rejected")
.assert_contains("Action: failed");
assert_eq!(
remote_qr
.read_event()
.await
.unwrap_message()
.recipients
.into_iter()
.map(|r| r.address)
.collect::<Vec<_>>(),
vec![
"bill@foobar.org".to_string(),
"jane@foobar.org".to_string(),
"john@foobar.org".to_string()
]
);
remote_qr.assert_empty_queue();
}

View file

@ -0,0 +1,60 @@
use std::sync::Arc;
use tokio::sync::watch;
use crate::{
config::{Config, ConfigContext, ServerProtocol},
core::Core,
};
use super::add_test_certs;
pub mod extensions;
pub mod lmtp;
pub mod smtp;
pub mod throttle;
const SERVER: &str = "
[server]
hostname = 'mx.example.org'
greeting = 'Test SMTP instance'
protocol = 'smtp'
[server.listener.smtp-debug]
bind = ['127.0.0.1:9925']
[server.listener.lmtp-debug]
bind = ['127.0.0.1:9924']
protocol = 'lmtp'
tls.implicit = true
[server.socket]
reuse-addr = true
[server.tls]
enable = true
implicit = false
certificate = 'default'
[certificate.default]
cert = 'file://{CERT}'
private-key = 'file://{PK}'
";
pub fn start_test_server(core: Arc<Core>, smtp: bool) -> watch::Sender<bool> {
// Spawn listeners
let mut ctx = ConfigContext::default();
Config::parse(&add_test_certs(SERVER))
.unwrap()
.parse_servers(&mut ctx)
.unwrap();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
for server in ctx.servers {
if (smtp && server.protocol == ServerProtocol::Smtp)
|| (!smtp && server.protocol == ServerProtocol::Lmtp)
{
server.spawn(core.clone(), shutdown_rx.clone()).unwrap();
}
}
shutdown_tx
}

202
src/tests/outbound/smtp.rs Normal file
View file

@ -0,0 +1,202 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use mail_auth::MX;
use crate::{
config::{ConfigContext, IfBlock},
core::{Core, Session},
queue::{manager::Queue, DeliveryAttempt, Event, WorkerResult},
tests::{outbound::start_test_server, session::VerifyResponse, ParseTestConfig},
};
#[tokio::test]
async fn smtp_delivery() {
/*tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();*/
// Start test server
let mut core = Core::test();
core.session.config.rcpt.relay = IfBlock::new(true);
core.session.config.extensions.dsn = IfBlock::new(true);
let mut remote_qr = core.init_test_queue("smtp_delivery_remote");
let _rx = start_test_server(core.into(), true);
// Add mock DNS entries
let mut core = Core::test();
core.resolvers.dns.mx_add(
"foobar.org",
vec![
MX {
exchanges: vec!["mx1.foobar.org".to_string()],
preference: 10,
},
MX {
exchanges: vec!["mx2.foobar.org".to_string()],
preference: 20,
},
],
Instant::now() + Duration::from_secs(10),
);
core.resolvers.dns.mx_add(
"foobar.net",
vec![MX {
exchanges: vec!["mx1.foobar.net".to_string(), "mx2.foobar.net".to_string()],
preference: 10,
}],
Instant::now() + Duration::from_secs(10),
);
core.resolvers.dns.ipv4_add(
"mx1.foobar.org",
vec!["127.0.0.1".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
core.resolvers.dns.ipv4_add(
"mx2.foobar.org",
vec!["127.0.0.1".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
core.resolvers.dns.ipv4_add(
"mx1.foobar.net",
vec!["127.0.0.1".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
core.resolvers.dns.ipv4_add(
"mx2.foobar.net",
vec!["127.0.0.1".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
// Multiple delivery attempts
let mut local_qr = core.init_test_queue("smtp_delivery_local");
core.session.config.rcpt.relay = IfBlock::new(true);
core.session.config.rcpt.max_recipients = IfBlock::new(100);
core.session.config.extensions.dsn = IfBlock::new(true);
let mut config = &mut core.queue.config;
config.retry = IfBlock::new(vec![Duration::from_millis(100)]);
config.notify = "[{if = 'rcpt-domain', eq = 'foobar.org', then = ['100ms', '200ms']},
{else = ['100ms']}]"
.parse_if(&ConfigContext::default());
config.expire = "[{if = 'rcpt-domain', eq = 'foobar.org', then = '400ms'},
{else = '500ms'}]"
.parse_if(&ConfigContext::default());
let core = Arc::new(core);
let mut queue = Queue::default();
let mut session = Session::test(core.clone());
session.data.remote_ip = "10.0.0.1".parse().unwrap();
session.eval_session_params().await;
session.ehlo("mx.test.org").await;
session
.send_message(
"john@test.org",
&[
"<ok@foobar.org> NOTIFY=SUCCESS,DELAY,FAILURE",
"<delay@foobar.org> NOTIFY=SUCCESS,DELAY,FAILURE",
"<fail@foobar.org> NOTIFY=SUCCESS,DELAY,FAILURE",
"<ok@foobar.net> NOTIFY=SUCCESS,DELAY,FAILURE",
"<delay@foobar.net> NOTIFY=SUCCESS,DELAY,FAILURE",
"<fail@foobar.net> NOTIFY=SUCCESS,DELAY,FAILURE",
"<invalid@domain.org> NOTIFY=SUCCESS,DELAY,FAILURE",
],
"test:no_dkim",
"250",
)
.await;
DeliveryAttempt::from(local_qr.read_event().await.unwrap_message())
.try_deliver(core.clone(), &mut queue)
.await;
let mut dsn = Vec::new();
loop {
match local_qr.try_read_event().await {
Some(Event::Queue(message)) => {
dsn.push(message.inner);
}
Some(Event::Done(wr)) => match wr {
WorkerResult::Done => {
break;
}
WorkerResult::Retry(retry) => {
queue.main.push(retry);
}
WorkerResult::OnHold(_) => unreachable!(),
},
None | Some(Event::Stop) => break,
}
if !queue.main.is_empty() {
tokio::time::sleep(queue.wake_up_time()).await;
DeliveryAttempt::from(queue.next_due().unwrap())
.try_deliver(core.clone(), &mut queue)
.await;
}
}
assert!(queue.main.is_empty());
assert_eq!(dsn.len(), 5);
let mut dsn = dsn.into_iter();
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<ok@foobar.net> (delivered to")
.assert_contains("<ok@foobar.org> (delivered to")
.assert_contains("<invalid@domain.org> (failed to lookup")
.assert_contains("<fail@foobar.net> (host ")
.assert_contains("<fail@foobar.org> (host ");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<delay@foobar.net> (host ")
.assert_contains("<delay@foobar.org> (host ")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<delay@foobar.org> (host ")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<delay@foobar.org> (host ");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<delay@foobar.net> (host ")
.assert_contains("Action: failed");
assert_eq!(
remote_qr
.read_event()
.await
.unwrap_message()
.recipients
.into_iter()
.map(|r| r.address)
.collect::<Vec<_>>(),
vec!["ok@foobar.net".to_string()]
);
assert_eq!(
remote_qr
.read_event()
.await
.unwrap_message()
.recipients
.into_iter()
.map(|r| r.address)
.collect::<Vec<_>>(),
vec!["ok@foobar.org".to_string()]
);
remote_qr.assert_empty_queue();
}

View file

@ -0,0 +1,3 @@
pub fn test() {
let imple = "tr";
}

View file

@ -5,19 +5,16 @@ use std::{
};
use smtp_proto::{Response, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_SUCCESS};
use tokio::{fs::File, io::AsyncReadExt, sync::mpsc};
use tokio::{fs::File, io::AsyncReadExt};
use crate::{
config::{ConfigContext, IfBlock},
config::ConfigContext,
core::Core,
queue::{
DeliveryAttempt, Domain, Error, ErrorDetails, HostResponse, Message, Recipient, Schedule,
Status,
},
tests::{
inbound::{assert_empty_queue, read_queue},
make_temp_dir, ParseTestConfig,
},
tests::ParseTestConfig,
};
#[tokio::test]
@ -91,21 +88,16 @@ async fn generate_dsn() {
.unwrap();
// Create temp dir for queue
let temp_dir = make_temp_dir("smtp_dsn_test", true);
core.queue.config.path = IfBlock::new(temp_dir.temp_dir.clone());
// Create queue and report channels
let (queue_tx, mut queue_rx) = mpsc::channel(128);
core.queue.tx = queue_tx;
let mut qr = core.init_test_queue("smtp_dsn_test");
// Disabled DSN
core.queue.send_dsn(&mut attempt).await;
assert_empty_queue(&mut queue_rx);
qr.assert_empty_queue();
// Failure DSN
attempt.message.recipients[0].flags = flags;
core.queue.send_dsn(&mut attempt).await;
compare_dsn(read_queue(&mut queue_rx).await.inner, "failure.eml").await;
compare_dsn(qr.read_event().await.unwrap_message(), "failure.eml").await;
// Success DSN
attempt.message.recipients.push(Recipient {
@ -124,7 +116,7 @@ async fn generate_dsn() {
orcpt: None,
});
core.queue.send_dsn(&mut attempt).await;
compare_dsn(read_queue(&mut queue_rx).await.inner, "success.eml").await;
compare_dsn(qr.read_event().await.unwrap_message(), "success.eml").await;
// Delay DSN
attempt.message.recipients.push(Recipient {
@ -136,7 +128,7 @@ async fn generate_dsn() {
orcpt: "jdoe@example.org".to_string().into(),
});
core.queue.send_dsn(&mut attempt).await;
compare_dsn(read_queue(&mut queue_rx).await.inner, "delay.eml").await;
compare_dsn(qr.read_event().await.unwrap_message(), "delay.eml").await;
// Mixed DSN
for rcpt in &mut attempt.message.recipients {
@ -144,7 +136,7 @@ async fn generate_dsn() {
}
attempt.message.domains[0].notify.due = Instant::now();
core.queue.send_dsn(&mut attempt).await;
compare_dsn(read_queue(&mut queue_rx).await.inner, "mixed.eml").await;
compare_dsn(qr.read_event().await.unwrap_message(), "mixed.eml").await;
// Load queue
let queue = core.queue.read_queue().await;

View file

@ -1,3 +1,4 @@
pub mod dsn;
pub mod manager;
pub mod retry;
pub mod serialize;

209
src/tests/queue/retry.rs Normal file
View file

@ -0,0 +1,209 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use crate::{
config::{ConfigContext, IfBlock},
core::{Core, Session},
queue::{manager::Queue, DeliveryAttempt, Event, WorkerResult},
tests::{session::VerifyResponse, ParseTestConfig},
};
#[tokio::test]
async fn queue_retry() {
/*tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.finish(),
)
.unwrap();*/
let mut core = Core::test();
// Create temp dir for queue
let mut qr = core.init_test_queue("smtp_queue_retry_test");
let mut config = &mut core.session.config.rcpt;
config.relay = IfBlock::new(true);
let mut config = &mut core.session.config.extensions;
config.deliver_by = IfBlock::new(Some(Duration::from_secs(86400)));
config.future_release = IfBlock::new(Some(Duration::from_secs(86400)));
let mut config = &mut core.queue.config;
config.retry = IfBlock::new(vec![
Duration::from_millis(100),
Duration::from_millis(200),
Duration::from_millis(300),
]);
config.notify = "[{if = 'sender-domain', eq = 'test.org', then = ['150ms', '200ms']},
{else = ['15h', '22h']}]"
.parse_if(&ConfigContext::default());
config.expire = "[{if = 'sender-domain', eq = 'test.org', then = '600ms'},
{else = '1d'}]"
.parse_if(&ConfigContext::default());
// Create test message
let core = Arc::new(core);
let mut queue = Queue::default();
let mut session = Session::test(core.clone());
session.data.remote_ip = "10.0.0.1".parse().unwrap();
session.eval_session_params().await;
session.ehlo("mx.test.org").await;
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
.await;
let attempt = DeliveryAttempt::from(qr.read_event().await.unwrap_message());
// Expect a failed DSN
let path = attempt.message.path.clone();
attempt.try_deliver(core.clone(), &mut queue).await;
let message = qr.read_event().await.unwrap_message();
assert_eq!(message.return_path, "");
assert_eq!(message.domains.first().unwrap().domain, "test.org");
assert_eq!(message.recipients.first().unwrap().address, "john@test.org");
message
.read_lines()
.assert_contains("Content-Type: multipart/report")
.assert_contains("Final-Recipient: rfc822;bill@foobar.org")
.assert_contains("Action: failed");
qr.read_event().await.unwrap_done();
assert!(!path.exists());
// Expect a failed DSN for foobar.org, followed by two delayed DSN and
// a final failed DSN for _dns_error.org.
session
.send_message(
"john@test.org",
&["bill@foobar.org", "jane@_dns_error.org"],
"test:no_dkim",
"250",
)
.await;
let attempt = DeliveryAttempt::from(qr.read_event().await.unwrap_message());
let path = attempt.message.path.clone();
let mut dsn = Vec::new();
let mut num_retries = 0;
attempt.try_deliver(core.clone(), &mut queue).await;
loop {
match qr.try_read_event().await {
Some(Event::Queue(message)) => {
dsn.push(message.inner);
}
Some(Event::Done(wr)) => match wr {
WorkerResult::Done => break,
WorkerResult::Retry(retry) => {
queue.main.push(retry);
num_retries += 1;
}
WorkerResult::OnHold(_) => unreachable!(),
},
None | Some(Event::Stop) => break,
}
if !queue.main.is_empty() {
tokio::time::sleep(queue.wake_up_time()).await;
DeliveryAttempt::from(queue.next_due().unwrap())
.try_deliver(core.clone(), &mut queue)
.await;
}
}
assert!(queue.main.is_empty());
assert_eq!(num_retries, 3);
assert_eq!(dsn.len(), 4);
assert!(!path.exists());
let mut dsn = dsn.into_iter();
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<bill@foobar.org> (failed to lookup 'foobar.org'")
.assert_contains("Final-Recipient: rfc822;bill@foobar.org")
.assert_contains("Action: failed");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<jane@_dns_error.org> (failed to lookup '_dns_error.org'")
.assert_contains("Final-Recipient: rfc822;jane@_dns_error.org")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<jane@_dns_error.org> (failed to lookup '_dns_error.org'")
.assert_contains("Final-Recipient: rfc822;jane@_dns_error.org")
.assert_contains("Action: delayed");
dsn.next()
.unwrap()
.read_lines()
.assert_contains("<jane@_dns_error.org> (failed to lookup '_dns_error.org'")
.assert_contains("Final-Recipient: rfc822;jane@_dns_error.org")
.assert_contains("Action: failed");
// Test FUTURERELEASE + DELIVERBY (RETURN)
session.data.remote_ip = "10.0.0.2".parse().unwrap();
session.eval_session_params().await;
session
.send_message(
"<bill@foobar.org> HOLDFOR=60 BY=3600;R",
&["john@test.net"],
"test:no_dkim",
"250",
)
.await;
let now = Instant::now();
let schedule = qr.read_event().await.unwrap_schedule();
assert!([59, 60].contains(&schedule.due.duration_since(now).as_secs()));
assert!([59, 60].contains(
&schedule
.inner
.next_delivery_event()
.duration_since(now)
.as_secs()
));
assert!([3599, 3600].contains(
&schedule
.inner
.domains
.first()
.unwrap()
.expires
.duration_since(now)
.as_secs()
));
assert!([54059, 54060].contains(
&schedule
.inner
.domains
.first()
.unwrap()
.notify
.due
.duration_since(now)
.as_secs()
));
// Test DELIVERBY (NOTIFY)
session
.send_message(
"<bill@foobar.org> BY=3600;N",
&["john@test.net"],
"test:no_dkim",
"250",
)
.await;
let now = Instant::now();
let schedule = qr.read_event().await.unwrap_schedule();
assert!([3599, 3600].contains(
&schedule
.inner
.domains
.first()
.unwrap()
.notify
.due
.duration_since(now)
.as_secs()
));
}

View file

@ -4,16 +4,13 @@ use std::{
};
use smtp_proto::{Response, MAIL_REQUIRETLS, MAIL_SMTPUTF8, RCPT_CONNEG, RCPT_NOTIFY_FAILURE};
use tokio::sync::mpsc;
use crate::{
config::IfBlock,
core::Core,
queue::{
Domain, Error, ErrorDetails, HostResponse, Message, Recipient, Schedule, Status,
RCPT_STATUS_CHANGED,
},
tests::{inbound::read_queue, make_temp_dir},
};
#[tokio::test]
@ -21,12 +18,7 @@ async fn queue_serialize() {
let mut core = Core::test();
// Create temp dir for queue
let temp_dir = make_temp_dir("smtp_queue_serialize_test", true);
core.queue.config.path = IfBlock::new(temp_dir.temp_dir.clone());
// Create queue receiver
let (queue_tx, mut queue_rx) = mpsc::channel(128);
core.queue.tx = queue_tx;
let mut qr = core.init_test_queue("smtp_queue_serialize_test");
// Create test message
let message = Message {
@ -91,7 +83,7 @@ async fn queue_serialize() {
)
.await
);
let mut message = read_queue(&mut queue_rx).await.inner;
let mut message = qr.read_event().await.unwrap_message();
// Deserialize
assert_msg_eq(

View file

@ -40,12 +40,12 @@ allow-invalid-certs = true
#[tokio::test]
async fn remote_imap() {
// Enable logging
tracing::subscriber::set_global_default(
/*tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.finish(),
)
.unwrap();
.unwrap();*/
// Spawn mock LMTP server
let shutdown = spawn_mock_imap_server(5);

View file

@ -214,7 +214,7 @@ async fn accept_smtp(stream: TcpStream, acceptor: Arc<TlsAcceptor>, in_flight: O
if buf.contains("ok") {
format!("250 {}\r\n", buf.split_once(' ').unwrap().1)
} else {
"550-I refuse to\r\n550 accept that recipient.\r\n".to_string()
"550-I refuse to\r\n550 verify that recipient.\r\n".to_string()
}
} else if buf.starts_with("EXPN") {
if buf.contains("ok") {

View file

@ -3,7 +3,6 @@ use std::{path::PathBuf, sync::Arc};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
config::ServerProtocol,
core::{Core, ServerInstance, Session, SessionData, SessionParameters, State},
inbound::IsTls,
};
@ -127,22 +126,34 @@ impl Session<DummyIo> {
}
pub async fn mail_from(&mut self, from: &str, expected_code: &str) {
self.ingest(format!("MAIL FROM:<{}>\r\n", from).as_bytes())
.await
.unwrap();
self.ingest(
if !from.starts_with("<") {
format!("MAIL FROM:<{}>\r\n", from)
} else {
format!("MAIL FROM:{}\r\n", from)
}
.as_bytes(),
)
.await
.unwrap();
self.response().assert_code(expected_code);
}
pub async fn rcpt_to(&mut self, to: &str, expected_code: &str) {
self.ingest(format!("RCPT TO:<{}>\r\n", to).as_bytes())
.await
.unwrap();
self.ingest(
if !to.starts_with("<") {
format!("RCPT TO:<{}>\r\n", to)
} else {
format!("RCPT TO:{}\r\n", to)
}
.as_bytes(),
)
.await
.unwrap();
self.response().assert_code(expected_code);
}
pub async fn send_message(&mut self, from: &str, to: &str, data: &str, expected_code: &str) {
self.mail_from(from, "250").await;
self.rcpt_to(to, "250").await;
pub async fn data(&mut self, data: &str, expected_code: &str) {
self.ingest(b"DATA\r\n").await.unwrap();
self.response().assert_code("354");
if let Some(file) = data.strip_prefix("test:") {
@ -155,13 +166,21 @@ impl Session<DummyIo> {
self.ingest(b"\r\n.\r\n").await.unwrap();
self.response().assert_code(expected_code);
}
pub async fn send_message(&mut self, from: &str, to: &[&str], data: &str, expected_code: &str) {
self.mail_from(from, "250").await;
for to in to {
self.rcpt_to(to, "250").await;
}
self.data(data, expected_code).await;
}
}
pub fn load_test_message(file: &str) -> String {
let mut test_file = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
test_file.push("resources");
test_file.push("tests");
test_file.push("inbound");
test_file.push("messages");
test_file.push(format!("{}.eml", file));
std::fs::read_to_string(test_file).unwrap()
}
@ -203,7 +222,7 @@ impl ServerInstance {
Self {
id: "smtp".to_string(),
listener_id: 1,
protocol: ServerProtocol::Smtp,
is_smtp: true,
hostname: "mx.example.org".to_string(),
greeting: b"220 mx.example.org at your service.\r\n".to_vec(),
}