Message pipelining implementation.

This commit is contained in:
Mauro D 2023-02-22 16:50:13 +00:00
parent 3fa869307b
commit 01e140f270
11 changed files with 317 additions and 28 deletions

73
Cargo.lock generated
View file

@ -706,6 +706,17 @@ version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531" checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531"
[[package]]
name = "futures-macro"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.26" version = "0.3.26"
@ -727,6 +738,7 @@ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
@ -1414,6 +1426,52 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "opentelemetry"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e"
dependencies = [
"opentelemetry_api",
"opentelemetry_sdk",
]
[[package]]
name = "opentelemetry_api"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22"
dependencies = [
"fnv",
"futures-channel",
"futures-util",
"indexmap",
"js-sys",
"once_cell",
"pin-project-lite",
"thiserror",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113"
dependencies = [
"async-trait",
"crossbeam-channel",
"dashmap",
"fnv",
"futures-channel",
"futures-executor",
"futures-util",
"once_cell",
"opentelemetry_api",
"percent-encoding",
"rand",
"thiserror",
]
[[package]] [[package]]
name = "os_str_bytes" name = "os_str_bytes"
version = "6.4.1" version = "6.4.1"
@ -2124,6 +2182,7 @@ dependencies = [
"tokio-rustls", "tokio-rustls",
"tracing", "tracing",
"tracing-appender", "tracing-appender",
"tracing-opentelemetry",
"tracing-subscriber", "tracing-subscriber",
"webpki-roots", "webpki-roots",
"x509-parser", "x509-parser",
@ -2539,6 +2598,20 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-opentelemetry"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de"
dependencies = [
"once_cell",
"opentelemetry",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
]
[[package]] [[package]]
name = "tracing-subscriber" name = "tracing-subscriber"
version = "0.3.16" version = "0.3.16"

View file

@ -25,6 +25,7 @@ rayon = "1.5"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2" tracing-appender = "0.2"
tracing-opentelemetry = "0.18.0"
parking_lot = "0.12" parking_lot = "0.12"
regex = "1.7.0" regex = "1.7.0"
dashmap = "5.4" dashmap = "5.4"

View file

@ -2,7 +2,5 @@
Stalwart SMTP Server Stalwart SMTP Server
# TODO # TODO
- Spam filter
- Antivirus
- OpenTelemetry - OpenTelemetry

View file

@ -43,11 +43,21 @@ backlog = 1024
#tos = 1 #tos = 1
[global] [global]
log-level = "trace"
concurrency = 1024 concurrency = 1024
shared-map = {shard = 32, capacity = 10} shared-map = {shard = 32, capacity = 10}
#thread-pool = 8 #thread-pool = 8
[global.tracing]
method = "stdout"
level = "trace"
#[global.tracing]
#method = "log"
#path = "/var/log/smtp"
#prefix = "smtp.log"
#rotate = "daily"
#level = "trace"
[session] [session]
timeout = "5m" timeout = "5m"
transfer-limit = 262144000 # 250 MB transfer-limit = 262144000 # 250 MB
@ -112,6 +122,11 @@ wait = "5s"
[session.data] [session.data]
#script = data.sieve #script = data.sieve
#[session.data.pipe."spam-assassin"]
#command = "spamc"
#arguments = []
#timeout = "10s"
[session.data.limits] [session.data.limits]
messages = 10 messages = 10
size = 104857600 size = 104857600

View file

@ -0,0 +1,14 @@
#!/bin/bash
if [[ $1 == "hello" ]] && [[ $2 == "world" ]]; then
echo "X-My-Header: true"
while read line
do
echo "$line"
done < /dev/stdin
exit 0;
else
echo "Invalid parameters!"
exit 1;
fi

View file

@ -274,6 +274,7 @@ pub struct Rcpt {
pub struct Data { pub struct Data {
pub script: IfBlock<Option<Arc<Sieve>>>, pub script: IfBlock<Option<Arc<Sieve>>>,
pub pipe_commands: Vec<Pipe>,
// Limits // Limits
pub max_messages: IfBlock<usize>, pub max_messages: IfBlock<usize>,
@ -289,6 +290,12 @@ pub struct Data {
pub add_date: IfBlock<bool>, pub add_date: IfBlock<bool>,
} }
pub struct Pipe {
pub command: IfBlock<Option<String>>,
pub arguments: IfBlock<Vec<String>>,
pub timeout: IfBlock<Duration>,
}
pub struct SessionConfig { pub struct SessionConfig {
pub timeout: IfBlock<Duration>, pub timeout: IfBlock<Duration>,
pub duration: IfBlock<Duration>, pub duration: IfBlock<Duration>,

View file

@ -349,8 +349,31 @@ impl Config {
add_date: self add_date: self
.parse_if_block("session.data.add-headers.date", ctx, &available_keys)? .parse_if_block("session.data.add-headers.date", ctx, &available_keys)?
.unwrap_or_else(|| IfBlock::new(true)), .unwrap_or_else(|| IfBlock::new(true)),
pipe_commands: self.parse_pipes(ctx, &available_keys)?,
}) })
} }
pub fn parse_pipes(
&self,
ctx: &ConfigContext,
available_keys: &[EnvelopeKey],
) -> super::Result<Vec<Pipe>> {
let mut pipes = Vec::new();
for id in self.sub_keys("session.data.pipe") {
pipes.push(Pipe {
command: self
.parse_if_block(("session.data.pipe", id, "command"), ctx, available_keys)?
.unwrap_or_default(),
arguments: self
.parse_if_block(("session.data.pipe", id, "arguments"), ctx, available_keys)?
.unwrap_or_default(),
timeout: self
.parse_if_block(("session.data.pipe", id, "timeout"), ctx, available_keys)?
.unwrap_or_else(|| IfBlock::new(Duration::from_secs(30))),
})
}
Ok(pipes)
}
} }
impl ParseValue for MtPriority { impl ParseValue for MtPriority {

View file

@ -1,6 +1,7 @@
use std::{ use std::{
borrow::Cow, borrow::Cow,
path::PathBuf, path::PathBuf,
process::Stdio,
sync::Arc, sync::Arc,
time::{Duration, Instant, SystemTime}, time::{Duration, Instant, SystemTime},
}; };
@ -13,7 +14,10 @@ use mail_builder::headers::{date::Date, message_id::generate_message_id_header};
use smtp_proto::{ use smtp_proto::{
MAIL_BY_RETURN, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS, MAIL_BY_RETURN, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS,
}; };
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
process::Command,
};
use crate::{ use crate::{
config::DNSBL_FROM, config::DNSBL_FROM,
@ -257,11 +261,101 @@ impl<T: AsyncWrite + AsyncRead + IsTls + Unpin> Session<T> {
} }
} }
// Sieve filtering // Pipe message
let mut edited_message = None; let mut edited_message = None;
for pipe in &dc.pipe_commands {
if let Some(command_) = pipe.command.eval(self).await {
let piped_message = edited_message.as_ref().unwrap_or(&raw_message).clone();
let timeout = *pipe.timeout.eval(self).await;
let mut command = Command::new(command_);
for argument in pipe.arguments.eval(self).await {
command.arg(argument);
}
match command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.kill_on_drop(true)
.spawn()
{
Ok(mut child) => {
if let Some(mut stdin) = child.stdin.take() {
match tokio::time::timeout(timeout, stdin.write_all(&piped_message))
.await
{
Ok(Ok(_)) => {
drop(stdin);
match tokio::time::timeout(timeout, child.wait_with_output())
.await
{
Ok(Ok(output)) => {
if output.status.success()
&& !output.stdout.is_empty()
&& output.stdout[..] != piped_message[..]
{
edited_message = Arc::new(output.stdout).into();
}
tracing::debug!(parent: &self.span,
context = "pipe",
event = "success",
command = command_,
status = output.status.to_string());
}
Ok(Err(err)) => {
tracing::warn!(parent: &self.span,
context = "pipe",
event = "exec-error",
command = command_,
reason = %err);
}
Err(_) => {
tracing::warn!(parent: &self.span,
context = "pipe",
event = "timeout",
command = command_);
}
}
}
Ok(Err(err)) => {
tracing::warn!(parent: &self.span,
context = "pipe",
event = "write-error",
command = command_,
reason = %err);
}
Err(_) => {
tracing::warn!(parent: &self.span,
context = "pipe",
event = "stdin-timeout",
command = command_);
}
}
} else {
tracing::warn!(parent: &self.span,
context = "pipe",
event = "stdin-failed",
command = command_);
}
}
Err(err) => {
tracing::warn!(parent: &self.span,
context = "pipe",
event = "spawn-error",
command = command_,
reason = %err);
}
}
}
}
// Sieve filtering
if let Some(script) = dc.script.eval(self).await { if let Some(script) = dc.script.eval(self).await {
match self match self
.run_script(script.clone(), Some(raw_message.clone())) .run_script(
script.clone(),
Some(edited_message.as_ref().unwrap_or(&raw_message).clone()),
)
.await .await
{ {
ScriptResult::Accept => (), ScriptResult::Accept => (),

View file

@ -14,6 +14,7 @@ use smtp_server::{
UnwrapFailure, UnwrapFailure,
}; };
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
#[tokio::main] #[tokio::main]
@ -148,23 +149,8 @@ async fn main() -> std::io::Result<()> {
} }
} }
// Enable logging // Enable tracing
let file_appender = tracing_appender::rolling::daily("/var/log/stalwart-smtp", "smtp.log"); let _tracer = enable_tracing(&config).failed("Failed to enable tracing");
let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(
EnvFilter::builder()
.parse(&format!(
"smtp_server={}",
config.value("global.log-level").unwrap_or("info")
))
.failed("Failed to log level"),
)
.with_writer(non_blocking)
.finish(),
)
.failed("Failed to set subscriber");
tracing::info!( tracing::info!(
"Starting Stalwart SMTP server v{}...", "Starting Stalwart SMTP server v{}...",
env!("CARGO_PKG_VERSION") env!("CARGO_PKG_VERSION")
@ -241,6 +227,46 @@ async fn main() -> std::io::Result<()> {
Ok(()) Ok(())
} }
fn enable_tracing(config: &Config) -> smtp_server::config::Result<Option<WorkerGuard>> {
let level = config.value("global.tracing.level").unwrap_or("info");
let env_filter = EnvFilter::builder()
.parse(format!("smtp_server={}", level))
.failed("Failed to log level");
match config.value("global.tracing.method").unwrap_or_default() {
"log" => {
let path = config.value_require("global.tracing.path")?;
let prefix = config.value_require("global.tracing.prefix")?;
let file_appender = match config.value("global.tracing.rotate").unwrap_or("daily") {
"daily" => tracing_appender::rolling::daily(path, prefix),
"hourly" => tracing_appender::rolling::hourly(path, prefix),
"minutely" => tracing_appender::rolling::minutely(path, prefix),
_ => tracing_appender::rolling::never(path, prefix),
};
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(env_filter)
.with_writer(non_blocking)
.finish(),
)
.failed("Failed to set subscriber");
Ok(guard.into())
}
"stdout" => {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(env_filter)
.finish(),
)
.failed("Failed to set subscriber");
Ok(None)
}
_ => Ok(None),
}
}
fn parse_config() -> Config { fn parse_config() -> Config {
let mut config_path = None; let mut config_path = None;
let mut found_param = false; let mut found_param = false;

View file

@ -1,5 +1,7 @@
use std::path::PathBuf;
use crate::{ use crate::{
config::{Config, ConfigContext, IfBlock}, config::{Config, ConfigContext, EnvelopeKey, IfBlock},
core::{Core, Session}, core::{Core, Session},
tests::session::VerifyResponse, tests::session::VerifyResponse,
}; };
@ -14,6 +16,12 @@ idle-timeout = "5m"
[list] [list]
invalid-ehlos = ["spammer.org", "spammer.net"] invalid-ehlos = ["spammer.org", "spammer.net"]
[session.data.pipe."test"]
command = [ { if = "remote-ip", eq = "10.0.0.123", then = "/bin/bash" },
{ else = false } ]
arguments = ["%CFG_PATH%/pipe_me.sh", "hello", "world"]
timeout = "10s"
[sieve] [sieve]
from-name = "Sieve Daemon" from-name = "Sieve Daemon"
from-addr = "sieve@foobar.org" from-addr = "sieve@foobar.org"
@ -115,15 +123,24 @@ async fn sieve_scripts() {
) )
.unwrap();*/ .unwrap();*/
let mut pipe_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
pipe_path.push("resources");
pipe_path.push("tests");
pipe_path.push("pipe");
// Prepare config // Prepare config
let mut core = Core::test(); let mut core = Core::test();
let mut qr = core.init_test_queue("smtp_sieve_test"); let mut qr = core.init_test_queue("smtp_sieve_test");
let mut ctx = ConfigContext::default().parse_signatures(); let mut ctx = ConfigContext::default().parse_signatures();
let config = let config = Config::parse(
Config::parse(&CONFIG.replace("%PATH%", qr._temp_dir.temp_dir.as_path().to_str().unwrap())) &CONFIG
.unwrap(); .replace("%PATH%", qr._temp_dir.temp_dir.as_path().to_str().unwrap())
.replace("%CFG_PATH%", pipe_path.as_path().to_str().unwrap()),
)
.unwrap();
config.parse_lists(&mut ctx).unwrap(); config.parse_lists(&mut ctx).unwrap();
config.parse_databases(&mut ctx).unwrap(); config.parse_databases(&mut ctx).unwrap();
let pipes = config.parse_pipes(&ctx, &[EnvelopeKey::RemoteIp]).unwrap();
core.sieve = config.parse_sieve(&mut ctx).unwrap(); core.sieve = config.parse_sieve(&mut ctx).unwrap();
let config = &mut core.session.config; let config = &mut core.session.config;
config.connect.script = IfBlock::new(ctx.scripts.get("connect").cloned()); config.connect.script = IfBlock::new(ctx.scripts.get("connect").cloned());
@ -132,6 +149,7 @@ async fn sieve_scripts() {
config.rcpt.script = IfBlock::new(ctx.scripts.get("rcpt").cloned()); config.rcpt.script = IfBlock::new(ctx.scripts.get("rcpt").cloned());
config.data.script = IfBlock::new(ctx.scripts.get("data").cloned()); config.data.script = IfBlock::new(ctx.scripts.get("data").cloned());
config.rcpt.relay = IfBlock::new(true); config.rcpt.relay = IfBlock::new(true);
config.data.pipe_commands = pipes;
// Test connect script // Test connect script
let mut session = Session::test(core); let mut session = Session::test(core);
@ -259,5 +277,24 @@ async fn sieve_scripts() {
.unwrap_message() .unwrap_message()
.read_lines() .read_lines()
.assert_contains("X-Part-Number: 5") .assert_contains("X-Part-Number: 5")
.assert_contains("THIS IS A PIECE OF HTML TEXT"); .assert_contains("THIS IS A PIECE OF HTML TEXT")
.assert_not_contains("X-My-Header: true");
// Test pipes
session.data.remote_ip = "10.0.0.123".parse().unwrap();
session
.send_message(
"test@example.net",
&["pipe@foobar.com"],
"test:no_dkim",
"250",
)
.await;
qr.read_event()
.await
.unwrap_message()
.read_lines()
.assert_contains("X-My-Header: true")
.assert_contains("Authentication-Results");
qr.assert_empty_queue();
} }

View file

@ -208,6 +208,7 @@ impl SessionConfig {
add_auth_results: IfBlock::new(true), add_auth_results: IfBlock::new(true),
add_message_id: IfBlock::new(true), add_message_id: IfBlock::new(true),
add_date: IfBlock::new(true), add_date: IfBlock::new(true),
pipe_commands: vec![],
}, },
} }
} }