diff --git a/Cargo.lock b/Cargo.lock index 279b815..c664382 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -706,6 +706,17 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531" +[[package]] +name = "futures-macro" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.26" @@ -727,6 +738,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1414,6 +1426,52 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry_api" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" +dependencies = [ + "fnv", + "futures-channel", + "futures-util", + "indexmap", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" +dependencies = [ + "async-trait", + "crossbeam-channel", + "dashmap", + "fnv", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand", + "thiserror", +] + [[package]] name = "os_str_bytes" version = "6.4.1" @@ -2124,6 +2182,7 @@ dependencies = [ "tokio-rustls", "tracing", "tracing-appender", + "tracing-opentelemetry", "tracing-subscriber", "webpki-roots", "x509-parser", @@ -2539,6 +2598,20 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de" +dependencies = [ + "once_cell", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" diff --git a/Cargo.toml b/Cargo.toml index 0d48cd6..341aef3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ rayon = "1.5" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-appender = "0.2" +tracing-opentelemetry = "0.18.0" parking_lot = "0.12" regex = "1.7.0" dashmap = "5.4" diff --git a/README.md b/README.md index 26cedd6..fa1914b 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,5 @@ Stalwart SMTP Server # TODO -- Spam filter -- Antivirus - OpenTelemetry diff --git a/resources/config/config.toml b/resources/config/config.toml index bae5340..a36c141 100644 --- a/resources/config/config.toml +++ b/resources/config/config.toml @@ -43,11 +43,21 @@ backlog = 1024 #tos = 1 [global] -log-level = "trace" concurrency = 1024 shared-map = {shard = 32, capacity = 10} #thread-pool = 8 +[global.tracing] +method = "stdout" +level = "trace" + +#[global.tracing] +#method = "log" +#path = "/var/log/smtp" +#prefix = "smtp.log" +#rotate = "daily" +#level = "trace" + [session] timeout = "5m" transfer-limit = 262144000 # 250 MB @@ -112,6 +122,11 @@ wait = "5s" [session.data] #script = data.sieve +#[session.data.pipe."spam-assassin"] +#command = "spamc" +#arguments = [] +#timeout = "10s" + [session.data.limits] messages = 10 size = 104857600 diff --git a/resources/tests/pipe/pipe_me.sh b/resources/tests/pipe/pipe_me.sh new file mode 100644 index 0000000..d3d8c6e --- /dev/null +++ b/resources/tests/pipe/pipe_me.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +if [[ $1 == "hello" ]] && [[ $2 == "world" ]]; then + echo "X-My-Header: true" + while read line + do + echo "$line" + done < /dev/stdin + exit 0; +else + echo "Invalid parameters!" + exit 1; +fi + diff --git a/src/config/mod.rs b/src/config/mod.rs index 16eddf1..6f8ad4d 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -274,6 +274,7 @@ pub struct Rcpt { pub struct Data { pub script: IfBlock>>, + pub pipe_commands: Vec, // Limits pub max_messages: IfBlock, @@ -289,6 +290,12 @@ pub struct Data { pub add_date: IfBlock, } +pub struct Pipe { + pub command: IfBlock>, + pub arguments: IfBlock>, + pub timeout: IfBlock, +} + pub struct SessionConfig { pub timeout: IfBlock, pub duration: IfBlock, diff --git a/src/config/session.rs b/src/config/session.rs index e27da24..083cec9 100644 --- a/src/config/session.rs +++ b/src/config/session.rs @@ -349,8 +349,31 @@ impl Config { add_date: self .parse_if_block("session.data.add-headers.date", ctx, &available_keys)? .unwrap_or_else(|| IfBlock::new(true)), + pipe_commands: self.parse_pipes(ctx, &available_keys)?, }) } + + pub fn parse_pipes( + &self, + ctx: &ConfigContext, + available_keys: &[EnvelopeKey], + ) -> super::Result> { + let mut pipes = Vec::new(); + for id in self.sub_keys("session.data.pipe") { + pipes.push(Pipe { + command: self + .parse_if_block(("session.data.pipe", id, "command"), ctx, available_keys)? + .unwrap_or_default(), + arguments: self + .parse_if_block(("session.data.pipe", id, "arguments"), ctx, available_keys)? + .unwrap_or_default(), + timeout: self + .parse_if_block(("session.data.pipe", id, "timeout"), ctx, available_keys)? + .unwrap_or_else(|| IfBlock::new(Duration::from_secs(30))), + }) + } + Ok(pipes) + } } impl ParseValue for MtPriority { diff --git a/src/inbound/data.rs b/src/inbound/data.rs index 9a0aa9c..e3d8634 100644 --- a/src/inbound/data.rs +++ b/src/inbound/data.rs @@ -1,6 +1,7 @@ use std::{ borrow::Cow, path::PathBuf, + process::Stdio, sync::Arc, time::{Duration, Instant, SystemTime}, }; @@ -13,7 +14,10 @@ use mail_builder::headers::{date::Date, message_id::generate_message_id_header}; use smtp_proto::{ MAIL_BY_RETURN, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS, }; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::{ + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, + process::Command, +}; use crate::{ config::DNSBL_FROM, @@ -257,11 +261,101 @@ impl Session { } } - // Sieve filtering + // Pipe message let mut edited_message = None; + for pipe in &dc.pipe_commands { + if let Some(command_) = pipe.command.eval(self).await { + let piped_message = edited_message.as_ref().unwrap_or(&raw_message).clone(); + let timeout = *pipe.timeout.eval(self).await; + + let mut command = Command::new(command_); + for argument in pipe.arguments.eval(self).await { + command.arg(argument); + } + match command + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .kill_on_drop(true) + .spawn() + { + Ok(mut child) => { + if let Some(mut stdin) = child.stdin.take() { + match tokio::time::timeout(timeout, stdin.write_all(&piped_message)) + .await + { + Ok(Ok(_)) => { + drop(stdin); + match tokio::time::timeout(timeout, child.wait_with_output()) + .await + { + Ok(Ok(output)) => { + if output.status.success() + && !output.stdout.is_empty() + && output.stdout[..] != piped_message[..] + { + edited_message = Arc::new(output.stdout).into(); + } + + tracing::debug!(parent: &self.span, + context = "pipe", + event = "success", + command = command_, + status = output.status.to_string()); + } + Ok(Err(err)) => { + tracing::warn!(parent: &self.span, + context = "pipe", + event = "exec-error", + command = command_, + reason = %err); + } + Err(_) => { + tracing::warn!(parent: &self.span, + context = "pipe", + event = "timeout", + command = command_); + } + } + } + Ok(Err(err)) => { + tracing::warn!(parent: &self.span, + context = "pipe", + event = "write-error", + command = command_, + reason = %err); + } + Err(_) => { + tracing::warn!(parent: &self.span, + context = "pipe", + event = "stdin-timeout", + command = command_); + } + } + } else { + tracing::warn!(parent: &self.span, + context = "pipe", + event = "stdin-failed", + command = command_); + } + } + Err(err) => { + tracing::warn!(parent: &self.span, + context = "pipe", + event = "spawn-error", + command = command_, + reason = %err); + } + } + } + } + + // Sieve filtering if let Some(script) = dc.script.eval(self).await { match self - .run_script(script.clone(), Some(raw_message.clone())) + .run_script( + script.clone(), + Some(edited_message.as_ref().unwrap_or(&raw_message).clone()), + ) .await { ScriptResult::Accept => (), diff --git a/src/main.rs b/src/main.rs index 4bac27b..af50332 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use smtp_server::{ UnwrapFailure, }; use tokio::sync::{mpsc, watch}; +use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -148,23 +149,8 @@ async fn main() -> std::io::Result<()> { } } - // Enable logging - let file_appender = tracing_appender::rolling::daily("/var/log/stalwart-smtp", "smtp.log"); - let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); - tracing::subscriber::set_global_default( - tracing_subscriber::FmtSubscriber::builder() - .with_env_filter( - EnvFilter::builder() - .parse(&format!( - "smtp_server={}", - config.value("global.log-level").unwrap_or("info") - )) - .failed("Failed to log level"), - ) - .with_writer(non_blocking) - .finish(), - ) - .failed("Failed to set subscriber"); + // Enable tracing + let _tracer = enable_tracing(&config).failed("Failed to enable tracing"); tracing::info!( "Starting Stalwart SMTP server v{}...", env!("CARGO_PKG_VERSION") @@ -241,6 +227,46 @@ async fn main() -> std::io::Result<()> { Ok(()) } +fn enable_tracing(config: &Config) -> smtp_server::config::Result> { + let level = config.value("global.tracing.level").unwrap_or("info"); + let env_filter = EnvFilter::builder() + .parse(format!("smtp_server={}", level)) + .failed("Failed to log level"); + match config.value("global.tracing.method").unwrap_or_default() { + "log" => { + let path = config.value_require("global.tracing.path")?; + let prefix = config.value_require("global.tracing.prefix")?; + let file_appender = match config.value("global.tracing.rotate").unwrap_or("daily") { + "daily" => tracing_appender::rolling::daily(path, prefix), + "hourly" => tracing_appender::rolling::hourly(path, prefix), + "minutely" => tracing_appender::rolling::minutely(path, prefix), + _ => tracing_appender::rolling::never(path, prefix), + }; + + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(env_filter) + .with_writer(non_blocking) + .finish(), + ) + .failed("Failed to set subscriber"); + Ok(guard.into()) + } + "stdout" => { + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(env_filter) + .finish(), + ) + .failed("Failed to set subscriber"); + + Ok(None) + } + _ => Ok(None), + } +} + fn parse_config() -> Config { let mut config_path = None; let mut found_param = false; diff --git a/src/tests/inbound/scripts.rs b/src/tests/inbound/scripts.rs index af0bb28..9e702a4 100644 --- a/src/tests/inbound/scripts.rs +++ b/src/tests/inbound/scripts.rs @@ -1,5 +1,7 @@ +use std::path::PathBuf; + use crate::{ - config::{Config, ConfigContext, IfBlock}, + config::{Config, ConfigContext, EnvelopeKey, IfBlock}, core::{Core, Session}, tests::session::VerifyResponse, }; @@ -14,6 +16,12 @@ idle-timeout = "5m" [list] invalid-ehlos = ["spammer.org", "spammer.net"] +[session.data.pipe."test"] +command = [ { if = "remote-ip", eq = "10.0.0.123", then = "/bin/bash" }, + { else = false } ] +arguments = ["%CFG_PATH%/pipe_me.sh", "hello", "world"] +timeout = "10s" + [sieve] from-name = "Sieve Daemon" from-addr = "sieve@foobar.org" @@ -115,15 +123,24 @@ async fn sieve_scripts() { ) .unwrap();*/ + let mut pipe_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + pipe_path.push("resources"); + pipe_path.push("tests"); + pipe_path.push("pipe"); + // Prepare config let mut core = Core::test(); let mut qr = core.init_test_queue("smtp_sieve_test"); let mut ctx = ConfigContext::default().parse_signatures(); - let config = - Config::parse(&CONFIG.replace("%PATH%", qr._temp_dir.temp_dir.as_path().to_str().unwrap())) - .unwrap(); + let config = Config::parse( + &CONFIG + .replace("%PATH%", qr._temp_dir.temp_dir.as_path().to_str().unwrap()) + .replace("%CFG_PATH%", pipe_path.as_path().to_str().unwrap()), + ) + .unwrap(); config.parse_lists(&mut ctx).unwrap(); config.parse_databases(&mut ctx).unwrap(); + let pipes = config.parse_pipes(&ctx, &[EnvelopeKey::RemoteIp]).unwrap(); core.sieve = config.parse_sieve(&mut ctx).unwrap(); let config = &mut core.session.config; config.connect.script = IfBlock::new(ctx.scripts.get("connect").cloned()); @@ -132,6 +149,7 @@ async fn sieve_scripts() { config.rcpt.script = IfBlock::new(ctx.scripts.get("rcpt").cloned()); config.data.script = IfBlock::new(ctx.scripts.get("data").cloned()); config.rcpt.relay = IfBlock::new(true); + config.data.pipe_commands = pipes; // Test connect script let mut session = Session::test(core); @@ -259,5 +277,24 @@ async fn sieve_scripts() { .unwrap_message() .read_lines() .assert_contains("X-Part-Number: 5") - .assert_contains("THIS IS A PIECE OF HTML TEXT"); + .assert_contains("THIS IS A PIECE OF HTML TEXT") + .assert_not_contains("X-My-Header: true"); + + // Test pipes + session.data.remote_ip = "10.0.0.123".parse().unwrap(); + session + .send_message( + "test@example.net", + &["pipe@foobar.com"], + "test:no_dkim", + "250", + ) + .await; + qr.read_event() + .await + .unwrap_message() + .read_lines() + .assert_contains("X-My-Header: true") + .assert_contains("Authentication-Results"); + qr.assert_empty_queue(); } diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 4bf72dc..88e455f 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -208,6 +208,7 @@ impl SessionConfig { add_auth_results: IfBlock::new(true), add_message_id: IfBlock::new(true), add_date: IfBlock::new(true), + pipe_commands: vec![], }, } }