Store incoming reports in the data store

This commit is contained in:
mdecimus 2024-03-02 12:21:03 +01:00
parent 417bc38288
commit 875b1fa744
40 changed files with 1992 additions and 1541 deletions

609
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -29,3 +29,4 @@ human-size = "0.4.2"
futures = "0.3.28"
pwhash = "1.0.0"
rand = "0.8.5"
mail-auth = "0.3.7"

View file

@ -208,6 +208,20 @@ impl Client {
url: &str,
body: Option<B>,
) -> R {
self.try_http_request(method, url, body)
.await
.unwrap_or_else(|| {
eprintln!("Request failed: No data returned.");
std::process::exit(1);
})
}
pub async fn try_http_request<R: DeserializeOwned, B: Serialize>(
&self,
method: Method,
url: &str,
body: Option<B>,
) -> Option<R> {
let url = format!(
"{}{}{}",
self.url,
@ -240,6 +254,9 @@ impl Client {
match response.status() {
StatusCode::OK => (),
StatusCode::NOT_FOUND => {
return None;
}
StatusCode::UNAUTHORIZED => {
eprintln!("Authentication failed. Make sure the credentials are correct and that the account has administrator rights.");
std::process::exit(1);
@ -258,7 +275,7 @@ impl Client {
)
.unwrap_result("deserialize response")
{
Response::Data { data } => data,
Response::Data { data } => Some(data),
Response::Error { error, details } => {
eprintln!("Request failed: {details} ({error:?})");
std::process::exit(1);

View file

@ -117,6 +117,12 @@ pub enum PrincipalField {
Members,
}
#[derive(Clone, serde::Serialize, serde::Deserialize, Default)]
pub struct List<T> {
pub items: Vec<T>,
pub total: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PrincipalUpdate {
action: PrincipalAction,

View file

@ -21,7 +21,10 @@
* for more details.
*/
use super::cli::{Client, QueueCommands};
use super::{
cli::{Client, QueueCommands},
List,
};
use console::Term;
use human_size::{Byte, SpecificSize};
use mail_parser::DateTime;
@ -99,65 +102,62 @@ impl QueueCommands {
.map(|p| Cell::new(p).with_style(Attr::Bold))
.collect(),
));
for (message, id) in client
.http_request::<Vec<Option<Message>>, String>(
Method::GET,
&build_query("/api/queue/status?ids=", chunk),
None,
)
.await
.into_iter()
.zip(chunk)
{
if let Some(message) = message {
let mut rcpts = String::new();
let mut deliver_at = i64::MAX;
let mut deliver_pos = 0;
for (pos, domain) in message.domains.iter().enumerate() {
if let Some(next_retry) = &domain.next_retry {
let ts = next_retry.to_timestamp();
if ts < deliver_at {
deliver_at = ts;
deliver_pos = pos;
}
}
for rcpt in &domain.recipients {
if !rcpts.is_empty() {
rcpts.push('\n');
}
rcpts.push_str(&rcpt.address);
rcpts.push_str(" (");
rcpts.push_str(rcpt.status.status_short());
rcpts.push(')');
for id in chunk {
let message = client
.http_request::<Message, String>(
Method::GET,
&format!("/api/queue/messages/{id}"),
None,
)
.await;
let mut rcpts = String::new();
let mut deliver_at = i64::MAX;
let mut deliver_pos = 0;
for (pos, domain) in message.domains.iter().enumerate() {
if let Some(next_retry) = &domain.next_retry {
let ts = next_retry.to_timestamp();
if ts < deliver_at {
deliver_at = ts;
deliver_pos = pos;
}
}
let mut cells = Vec::new();
cells.push(Cell::new(&format!("{id:X}")));
cells.push(if deliver_at != i64::MAX {
Cell::new(
&message.domains[deliver_pos]
.next_retry
.as_ref()
.unwrap()
.to_rfc822(),
)
} else {
Cell::new("None")
});
cells.push(Cell::new(if !message.return_path.is_empty() {
&message.return_path
} else {
"<>"
}));
cells.push(Cell::new(&rcpts));
cells.push(Cell::new(
&SpecificSize::new(message.size as u32, Byte)
.unwrap()
.to_string(),
));
table.add_row(Row::new(cells));
for rcpt in &domain.recipients {
if !rcpts.is_empty() {
rcpts.push('\n');
}
rcpts.push_str(&rcpt.address);
rcpts.push_str(" (");
rcpts.push_str(rcpt.status.status_short());
rcpts.push(')');
}
}
let mut cells = Vec::new();
cells.push(Cell::new(&format!("{id:X}")));
cells.push(if deliver_at != i64::MAX {
Cell::new(
&message.domains[deliver_pos]
.next_retry
.as_ref()
.unwrap()
.to_rfc822(),
)
} else {
Cell::new("None")
});
cells.push(Cell::new(if !message.return_path.is_empty() {
&message.return_path
} else {
"<>"
}));
cells.push(Cell::new(&rcpts));
cells.push(Cell::new(
&SpecificSize::new(message.size as u32, Byte)
.unwrap()
.to_string(),
));
table.add_row(Row::new(cells));
}
eprintln!();
@ -173,21 +173,20 @@ impl QueueCommands {
eprintln!("\n{ids_len} queued message(s) found.")
}
QueueCommands::Status { ids } => {
for (message, id) in client
.http_request::<Vec<Option<Message>>, String>(
Method::GET,
&build_query("/api/queue/status?ids=", &parse_ids(&ids)),
None,
)
.await
.into_iter()
.zip(&ids)
{
for (uid, id) in parse_ids(&ids).into_iter().zip(ids) {
let message = client
.try_http_request::<Message, String>(
Method::GET,
&format!("/api/queue/messages/{uid}"),
None,
)
.await;
let mut table = Table::new();
table.add_row(Row::new(vec![
Cell::new("ID").with_style(Attr::Bold),
Cell::new(id),
Cell::new(&id),
]));
if let Some(message) = message {
table.add_row(Row::new(vec![
Cell::new("Sender").with_style(Attr::Bold),
@ -316,30 +315,31 @@ impl QueueCommands {
std::process::exit(1);
}
let mut query = form_urlencoded::Serializer::new("/api/queue/retry?".to_string());
if let Some(filter) = &domain {
query.append_pair("filter", filter);
}
if let Some(at) = time {
query.append_pair("at", &at.to_rfc3339());
}
query.append_pair("ids", &append_ids(String::new(), &parsed_ids));
let mut success_count = 0;
let mut failed_list = vec![];
for (success, id) in client
.http_request::<Vec<bool>, String>(Method::GET, &query.finish(), None)
.await
.into_iter()
.zip(ids)
{
if success {
for id in parsed_ids {
let mut query =
form_urlencoded::Serializer::new(format!("/api/queue/messages/{id}"));
if let Some(filter) = &domain {
query.append_pair("filter", filter);
}
if let Some(at) = time {
query.append_pair("at", &at.to_rfc3339());
}
if client
.try_http_request::<bool, String>(Method::PATCH, &query.finish(), None)
.await
.unwrap_or(false)
{
success_count += 1;
} else {
failed_list.push(id);
failed_list.push(id.to_string());
}
}
eprint!("\nSuccessfully rescheduled {success_count} message(s).");
if !failed_list.is_empty() {
eprint!(" Unable to reschedule id(s): {}.", failed_list.join(", "));
@ -371,27 +371,28 @@ impl QueueCommands {
std::process::exit(1);
}
let mut query = form_urlencoded::Serializer::new("/api/queue/cancel?".to_string());
if let Some(filter) = &rcpt {
query.append_pair("filter", filter);
}
query.append_pair("ids", &append_ids(String::new(), &parsed_ids));
let mut success_count = 0;
let mut failed_list = vec![];
for (success, id) in client
.http_request::<Vec<bool>, String>(Method::GET, &query.finish(), None)
.await
.into_iter()
.zip(ids)
{
if success {
for id in parsed_ids {
let mut query =
form_urlencoded::Serializer::new(format!("/api/queue/messages/{id}"));
if let Some(filter) = &rcpt {
query.append_pair("filter", filter);
}
if client
.try_http_request::<bool, String>(Method::DELETE, &query.finish(), None)
.await
.unwrap_or(false)
{
success_count += 1;
} else {
failed_list.push(id);
failed_list.push(id.to_string());
}
}
eprint!("\nCancelled delivery of {success_count} message(s).");
if !failed_list.is_empty() {
eprint!(
@ -413,7 +414,7 @@ impl Client {
before: &Option<DateTime>,
after: &Option<DateTime>,
) -> Vec<u64> {
let mut query = form_urlencoded::Serializer::new("/api/queue/list?".to_string());
let mut query = form_urlencoded::Serializer::new("/api/queue/messages".to_string());
if let Some(sender) = from {
query.append_pair("from", sender);
@ -428,8 +429,9 @@ impl Client {
query.append_pair("after", &after.to_rfc3339());
}
self.http_request::<Vec<u64>, String>(Method::GET, &query.finish(), None)
self.http_request::<List<u64>, String>(Method::GET, &query.finish(), None)
.await
.items
}
}
@ -479,22 +481,6 @@ fn parse_ids(ids: &[String]) -> Vec<u64> {
result
}
fn build_query(path: &str, ids: &[u64]) -> String {
let mut query = String::with_capacity(path.len() + (ids.len() * 10));
query.push_str(path);
append_ids(query, ids)
}
fn append_ids(mut query: String, ids: &[u64]) -> String {
for (pos, id) in ids.iter().enumerate() {
if pos != 0 {
query.push(',');
}
query.push_str(&id.to_string());
}
query
}
impl Status {
fn status_short(&self) -> &str {
match self {

View file

@ -22,24 +22,83 @@
*/
use super::cli::{Client, ReportCommands, ReportFormat};
use crate::modules::queue::deserialize_datetime;
use crate::modules::{queue::deserialize_datetime, List};
use console::Term;
use human_size::{Byte, SpecificSize};
use mail_auth::{
dmarc::URI,
mta_sts::ReportUri,
report::{self, tlsrpt::TlsReport},
};
use mail_parser::DateTime;
use prettytable::{format::Alignment, Attr, Cell, Row, Table};
use prettytable::{format, Attr, Cell, Row, Table};
use reqwest::Method;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
pub struct Report {
pub domain: String,
#[serde(rename = "type")]
pub type_: ReportFormat,
#[serde(deserialize_with = "deserialize_datetime")]
pub range_from: DateTime,
#[serde(deserialize_with = "deserialize_datetime")]
pub range_to: DateTime,
pub size: usize,
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Report {
Tls {
id: String,
domain: String,
#[serde(deserialize_with = "deserialize_datetime")]
range_from: DateTime,
#[serde(deserialize_with = "deserialize_datetime")]
range_to: DateTime,
report: TlsReport,
rua: Vec<ReportUri>,
},
Dmarc {
id: String,
domain: String,
#[serde(deserialize_with = "deserialize_datetime")]
range_from: DateTime,
#[serde(deserialize_with = "deserialize_datetime")]
range_to: DateTime,
report: report::Report,
rua: Vec<URI>,
},
}
impl Report {
pub fn domain(&self) -> &str {
match self {
Report::Tls { domain, .. } => domain,
Report::Dmarc { domain, .. } => domain,
}
}
pub fn type_(&self) -> &str {
match self {
Report::Tls { .. } => "TLS",
Report::Dmarc { .. } => "DMARC",
}
}
pub fn range_from(&self) -> &DateTime {
match self {
Report::Tls { range_from, .. } => range_from,
Report::Dmarc { range_from, .. } => range_from,
}
}
pub fn range_to(&self) -> &DateTime {
match self {
Report::Tls { range_to, .. } => range_to,
Report::Dmarc { range_to, .. } => range_to,
}
}
pub fn num_records(&self) -> usize {
match self {
Report::Tls { report, .. } => report
.policies
.iter()
.map(|p| p.failure_details.len())
.sum(),
Report::Dmarc { report, .. } => report.records().len(),
}
}
}
impl ReportCommands {
@ -51,7 +110,7 @@ impl ReportCommands {
page_size,
} => {
let stdout = Term::buffered_stdout();
let mut query = form_urlencoded::Serializer::new("/api/report/list?".to_string());
let mut query = form_urlencoded::Serializer::new("/api/queue/reports".to_string());
if let Some(domain) = &domain {
query.append_pair("domain", domain);
@ -61,8 +120,9 @@ impl ReportCommands {
}
let ids = client
.http_request::<Vec<String>, String>(Method::GET, &query.finish(), None)
.await;
.http_request::<List<String>, String>(Method::GET, &query.finish(), None)
.await
.items;
let ids_len = ids.len();
let page_size = page_size.map(|p| std::cmp::max(p, 1)).unwrap_or(20);
let pages_total = (ids_len as f64 / page_size as f64).ceil() as usize;
@ -70,30 +130,29 @@ impl ReportCommands {
// Build table
let mut table = Table::new();
table.add_row(Row::new(
["ID", "Domain", "Type", "From Date", "To Date", "Size"]
["ID", "Domain", "Type", "From Date", "To Date", "Records"]
.iter()
.map(|p| Cell::new(p).with_style(Attr::Bold))
.collect(),
));
for (report, id) in client
.http_request::<Vec<Option<Report>>, String>(
Method::GET,
&format!("/api/report/status?ids={}", chunk.join(",")),
None,
)
.await
.into_iter()
.zip(chunk)
{
for id in chunk {
let report = client
.try_http_request::<Report, String>(
Method::GET,
&format!("/api/queue/reports/{id}"),
None,
)
.await;
if let Some(report) = report {
table.add_row(Row::new(vec![
Cell::new(id),
Cell::new(&report.domain),
Cell::new(report.type_.name()),
Cell::new(&report.range_from.to_rfc822()),
Cell::new(&report.range_to.to_rfc822()),
Cell::new(report.domain()),
Cell::new(report.type_()),
Cell::new(&report.range_from().to_rfc822()),
Cell::new(&report.range_to().to_rfc822()),
Cell::new(
&SpecificSize::new(report.size as u32, Byte)
&SpecificSize::new(report.num_records() as u32, Byte)
.unwrap()
.to_string(),
),
@ -114,42 +173,41 @@ impl ReportCommands {
eprintln!("\n{ids_len} queued message(s) found.")
}
ReportCommands::Status { ids } => {
for (report, id) in client
.http_request::<Vec<Option<Report>>, String>(
Method::GET,
&format!("/api/report/status?ids={}", ids.join(",")),
None,
)
.await
.into_iter()
.zip(&ids)
{
for id in ids {
let report = client
.try_http_request::<Report, String>(
Method::GET,
&format!("/api/queue/reports/{id}"),
None,
)
.await;
let mut table = Table::new();
table.add_row(Row::new(vec![
Cell::new("ID").with_style(Attr::Bold),
Cell::new(id),
Cell::new(&id),
]));
if let Some(report) = report {
table.add_row(Row::new(vec![
Cell::new("Domain Name").with_style(Attr::Bold),
Cell::new(&report.domain),
Cell::new(report.domain()),
]));
table.add_row(Row::new(vec![
Cell::new("Type").with_style(Attr::Bold),
Cell::new(report.type_.name()),
Cell::new(report.type_()),
]));
table.add_row(Row::new(vec![
Cell::new("From Date").with_style(Attr::Bold),
Cell::new(&report.range_from.to_rfc822()),
Cell::new(&report.range_from().to_rfc822()),
]));
table.add_row(Row::new(vec![
Cell::new("To Date").with_style(Attr::Bold),
Cell::new(&report.range_to.to_rfc822()),
Cell::new(&report.range_to().to_rfc822()),
]));
table.add_row(Row::new(vec![
Cell::new("Size").with_style(Attr::Bold),
Cell::new("Records").with_style(Attr::Bold),
Cell::new(
&SpecificSize::new(report.size as u32, Byte)
&SpecificSize::new(report.num_records() as u32, Byte)
.unwrap()
.to_string(),
),
@ -157,7 +215,7 @@ impl ReportCommands {
} else {
table.add_row(Row::new(vec![Cell::new_align(
"-- Not found --",
Alignment::CENTER,
format::Alignment::CENTER,
)
.with_hspan(2)]));
}
@ -170,17 +228,16 @@ impl ReportCommands {
ReportCommands::Cancel { ids } => {
let mut success_count = 0;
let mut failed_list = vec![];
for (success, id) in client
.http_request::<Vec<bool>, String>(
Method::GET,
&format!("/api/report/cancel?ids={}", ids.join(",")),
None,
)
.await
.into_iter()
.zip(ids)
{
if success {
for id in ids {
let success = client
.try_http_request::<bool, String>(
Method::DELETE,
&format!("/api/queue/reports/{id}"),
None,
)
.await;
if success.unwrap_or_default() {
success_count += 1;
} else {
failed_list.push(id);
@ -206,11 +263,4 @@ impl ReportFormat {
ReportFormat::Tls => "tls",
}
}
fn name(&self) -> &'static str {
match self {
ReportFormat::Dmarc => "DMARC",
ReportFormat::Tls => "TLS",
}
}
}

View file

@ -346,7 +346,7 @@ impl JMAP {
}
("store", Some("maintenance"), &Method::GET) => {
match self.store.purge_blobs(self.blob_store.clone()).await {
Ok(_) => match self.store.purge_bitmaps().await {
Ok(_) => match self.store.purge_store().await {
Ok(_) => JsonResponse::new(json!({
"data": (),
}))
@ -456,9 +456,9 @@ impl JMAP {
}
}
("oauth", _, _) => self.handle_api_request(req, body, access_token).await,
(path_1 @ ("queue" | "report"), Some(path_2), &Method::GET) => {
(path_1 @ ("queue" | "reports"), Some(path_2), &Method::GET) => {
self.smtp
.handle_manage_request(req.uri(), req.method(), path_1, path_2)
.handle_manage_request(req.uri(), req.method(), path_1, path_2, path.next())
.await
}
_ => RequestError::not_found().into_http_response(),

View file

@ -44,7 +44,7 @@ dashmap = "5.4"
blake3 = "1.3"
lru-cache = "0.1.2"
rand = "0.8.5"
x509-parser = "0.15.0"
x509-parser = "0.16.0"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-webpki-roots", "blocking"] }
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"
@ -53,7 +53,7 @@ lazy_static = "1.4"
whatlang = "0.16"
imagesize = "0.12"
idna = "0.5"
decancer = "1.6.1"
decancer = "3.0.1"
unicode-security = "0.1.0"
infer = "0.15.0"
bincode = "1.3.1"

View file

@ -30,12 +30,7 @@ pub mod session;
pub mod shared;
pub mod throttle;
use std::{
net::SocketAddr,
path::PathBuf,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use ahash::AHashMap;
use directory::Directories;
@ -49,6 +44,7 @@ use store::Stores;
use utils::{
config::{if_block::IfBlock, utils::ConstantValue, Rate, Server, ServerProtocol},
expr::{Expression, Token},
snowflake::SnowflakeIdGenerator,
};
use crate::{
@ -250,8 +246,8 @@ pub struct ReportConfig {
pub struct ReportAnalysis {
pub addresses: Vec<AddressMatch>,
pub forward: bool,
pub store: Option<PathBuf>,
pub report_id: AtomicU64,
pub store: Option<Duration>,
pub report_id: SnowflakeIdGenerator,
}
pub enum AddressMatch {

View file

@ -31,6 +31,7 @@ use utils::{
Config,
},
expr::{Constant, Variable},
snowflake::SnowflakeIdGenerator,
};
use super::{
@ -98,7 +99,10 @@ impl ConfigReport for Config {
addresses,
forward: self.property("report.analysis.forward")?.unwrap_or(false),
store: self.property("report.analysis.store")?,
report_id: 0.into(),
report_id: self
.property::<u64>("storage.cluster.node-id")?
.map(SnowflakeIdGenerator::with_node_id)
.unwrap_or_else(SnowflakeIdGenerator::new),
},
})
}

File diff suppressed because it is too large Load diff

View file

@ -25,7 +25,7 @@ use std::{
borrow::Cow,
collections::hash_map::Entry,
io::{Cursor, Read},
sync::{atomic::Ordering, Arc},
sync::Arc,
time::SystemTime,
};
@ -37,6 +37,12 @@ use mail_auth::{
};
use mail_parser::{DateTime, MessageParser, MimeHeaders, PartType};
use store::{
write::{now, BatchBuilder, Bincode, ReportClass, ValueClass},
Serialize,
};
use tokio::runtime::Handle;
use crate::core::SMTP;
enum Compression {
@ -45,18 +51,26 @@ enum Compression {
Zip,
}
enum Format {
Dmarc,
Tls,
Arf,
enum Format<D, T, A> {
Dmarc(D),
Tls(T),
Arf(A),
}
struct ReportData<'x> {
compression: Compression,
format: Format,
format: Format<(), (), ()>,
data: &'x [u8],
}
#[derive(serde::Serialize, serde::Deserialize)]
pub struct IncomingReport<T> {
pub from: String,
pub to: Vec<String>,
pub subject: String,
pub report: T,
}
pub trait AnalyzeReport {
fn analyze_report(&self, message: Arc<Vec<u8>>);
}
@ -64,6 +78,7 @@ pub trait AnalyzeReport {
impl AnalyzeReport for Arc<SMTP> {
fn analyze_report(&self, message: Arc<Vec<u8>>) {
let core = self.clone();
let handle = Handle::current();
self.worker_pool.spawn(move || {
let message = if let Some(message) = MessageParser::default().parse(message.as_ref()) {
message
@ -75,7 +90,15 @@ impl AnalyzeReport for Arc<SMTP> {
.from()
.and_then(|a| a.last())
.and_then(|a| a.address())
.unwrap_or("unknown");
.unwrap_or_default()
.to_string();
let to = message.to().map_or_else(Vec::new, |a| {
a.iter()
.filter_map(|a| a.address())
.map(|a| a.to_string())
.collect()
});
let subject = message.subject().unwrap_or_default().to_string();
let mut reports = Vec::new();
for part in &message.parts {
@ -92,13 +115,13 @@ impl AnalyzeReport for Arc<SMTP> {
{
reports.push(ReportData {
compression: Compression::None,
format: Format::Dmarc,
format: Format::Dmarc(()),
data: report.as_bytes(),
});
} else if part.is_content_type("message", "feedback-report") {
reports.push(ReportData {
compression: Compression::None,
format: Format::Arf,
format: Format::Arf(()),
data: report.as_bytes(),
});
}
@ -107,7 +130,7 @@ impl AnalyzeReport for Arc<SMTP> {
if part.is_content_type("message", "feedback-report") {
reports.push(ReportData {
compression: Compression::None,
format: Format::Arf,
format: Format::Arf(()),
data: report.as_ref(),
});
continue;
@ -131,13 +154,13 @@ impl AnalyzeReport for Arc<SMTP> {
_ => Compression::None,
};
let format = match (tls_parts.map(|(c, _)| c).unwrap_or(subtype), ext) {
("xml", _) => Format::Dmarc,
("tlsrpt", _) | (_, "json") => Format::Tls,
("xml", _) => Format::Dmarc(()),
("tlsrpt", _) | (_, "json") => Format::Tls(()),
_ => {
if attachment_name
.map_or(false, |n| n.contains(".xml") || n.contains('!'))
{
Format::Dmarc
Format::Dmarc(())
} else {
continue;
}
@ -213,10 +236,11 @@ impl AnalyzeReport for Arc<SMTP> {
}
};
match report.format {
Format::Dmarc => match Report::parse_xml(&data) {
let report = match report.format {
Format::Dmarc(_) => match Report::parse_xml(&data) {
Ok(report) => {
report.log();
Format::Dmarc(report)
}
Err(err) => {
tracing::debug!(
@ -228,9 +252,10 @@ impl AnalyzeReport for Arc<SMTP> {
continue;
}
},
Format::Tls => match TlsReport::parse_json(&data) {
Format::Tls(_) => match TlsReport::parse_json(&data) {
Ok(report) => {
report.log();
Format::Tls(report)
}
Err(err) => {
tracing::debug!(
@ -242,9 +267,10 @@ impl AnalyzeReport for Arc<SMTP> {
continue;
}
},
Format::Arf => match Feedback::parse_arf(&data) {
Format::Arf(_) => match Feedback::parse_arf(&data) {
Some(report) => {
report.log();
Format::Arf(report.into_owned())
}
None => {
tracing::debug!(
@ -255,47 +281,72 @@ impl AnalyzeReport for Arc<SMTP> {
continue;
}
},
}
};
// Save report
if let Some(report_path) = &core.report.config.analysis.store {
let (report_format, extension) = match report.format {
Format::Dmarc => ("dmarc", "xml"),
Format::Tls => ("tlsrpt", "json"),
Format::Arf => ("arf", "txt"),
};
let c_extension = match report.compression {
Compression::None => "",
Compression::Gzip => ".gz",
Compression::Zip => ".zip",
};
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
// Store report
if let Some(expires_in) = &core.report.config.analysis.store {
let expires = now() + expires_in.as_secs();
let id = core
.report
.config
.analysis
.report_id
.fetch_add(1, Ordering::Relaxed);
.generate()
.unwrap_or(expires);
// Build path
let mut report_path = report_path.clone();
report_path.push(format!(
"{report_format}_{now}_{id}.{extension}{c_extension}"
));
if let Err(err) = std::fs::write(&report_path, report.data) {
tracing::warn!(
context = "report",
event = "error",
from = from,
"Failed to write incoming report to {}: {}",
report_path.display(),
err
);
let mut batch = BatchBuilder::new();
match report {
Format::Dmarc(report) => {
batch.set(
ValueClass::Report(ReportClass::Dmarc { id, expires }),
Bincode::new(IncomingReport {
from,
to,
subject,
report,
})
.serialize(),
);
}
Format::Tls(report) => {
batch.set(
ValueClass::Report(ReportClass::Tls { id, expires }),
Bincode::new(IncomingReport {
from,
to,
subject,
report,
})
.serialize(),
);
}
Format::Arf(report) => {
batch.set(
ValueClass::Report(ReportClass::Arf { id, expires }),
Bincode::new(IncomingReport {
from,
to,
subject,
report,
})
.serialize(),
);
}
}
let batch = batch.build();
let _enter = handle.enter();
handle.spawn(async move {
if let Err(err) = core.shared.default_data_store.write(batch).await {
tracing::warn!(
context = "report",
event = "error",
"Failed to write incoming report: {}",
err
);
}
});
}
break;
return;
}
});
}

View file

@ -73,7 +73,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
.with_dkim_domain(signature.domain())
.with_dkim_selector(signature.selector())
.with_dkim_identity(signature.identity())
.with_headers(message.raw_headers())
.with_headers(std::str::from_utf8(message.raw_headers()).unwrap_or_default())
.write_rfc5322(
(
self.core

View file

@ -129,7 +129,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
let mut auth_failure = self
.new_auth_failure(AuthFailureType::Dmarc, rejected)
.with_authentication_results(auth_results.to_string())
.with_headers(message.raw_headers());
.with_headers(std::str::from_utf8(message.raw_headers()).unwrap_or_default());
// Report the first failed signature
let dkim_failed = if let (
@ -300,7 +300,7 @@ impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {
}
impl SMTP {
pub async fn generate_dmarc_report(&self, event: ReportEvent) {
pub async fn send_dmarc_aggregate_report(&self, event: ReportEvent) {
let span = tracing::info_span!(
"dmarc-report",
domain = event.domain,
@ -308,16 +308,21 @@ impl SMTP {
range_to = event.due,
);
// Deserialize report
let dmarc = match self
.shared
.default_data_store
.get_value::<Bincode<DmarcFormat>>(ValueKey::from(ValueClass::Queue(
QueueClass::DmarcReportHeader(event.clone()),
)))
// Generate report
let mut serialized_size = serde_json::Serializer::new(SerializedSize::new(
self.eval_if(
&self.report.config.dmarc_aggregate.max_size,
&RecipientDomain::new(event.domain.as_str()),
)
.await
.unwrap_or(25 * 1024 * 1024),
));
let mut rua = Vec::new();
let report = match self
.generate_dmarc_aggregate_report(&event, &mut rua, Some(&mut serialized_size))
.await
{
Ok(Some(dmarc)) => dmarc.inner,
Ok(Some(report)) => report,
Ok(None) => {
tracing::warn!(
parent: &span,
@ -330,7 +335,7 @@ impl SMTP {
tracing::warn!(
parent: &span,
event = "error",
"Failed to read DMARC report: {}",
"Failed to read DMARC records: {}",
err
);
return;
@ -341,7 +346,7 @@ impl SMTP {
let rua = match self
.resolvers
.dns
.verify_dmarc_report_address(&event.domain, &dmarc.rua)
.verify_dmarc_report_address(&event.domain, &rua)
.await
{
Some(rcpts) => {
@ -355,7 +360,7 @@ impl SMTP {
parent: &span,
event = "failed",
reason = "unauthorized-rua",
rua = ?dmarc.rua,
rua = ?rua,
"Unauthorized external reporting addresses"
);
self.delete_dmarc_report(event).await;
@ -367,7 +372,7 @@ impl SMTP {
parent: &span,
event = "failed",
reason = "dns-failure",
rua = ?dmarc.rua,
rua = ?rua,
"Failed to validate external report addresses",
);
self.delete_dmarc_report(event).await;
@ -375,69 +380,8 @@ impl SMTP {
}
};
let mut serialized_size = serde_json::Serializer::new(SerializedSize::new(
self.eval_if(
&self.report.config.dmarc_aggregate.max_size,
&RecipientDomain::new(event.domain.as_str()),
)
.await
.unwrap_or(25 * 1024 * 1024),
));
let _ = serde::Serialize::serialize(&dmarc, &mut serialized_size);
// Serialize report
let config = &self.report.config.dmarc_aggregate;
// Fetch records
let records = match self
.fetch_dmarc_records(&event, &dmarc, Some(&mut serialized_size))
.await
{
Ok(records) => records,
Err(err) => {
tracing::warn!(
parent: &span,
event = "error",
"Failed to read DMARC records: {}",
err
);
return;
}
};
// Create report
let mut report = Report::new()
.with_policy_published(dmarc.policy)
.with_date_range_begin(event.seq_id)
.with_date_range_end(event.due)
.with_report_id(format!("{}_{}", event.policy_hash, event.seq_id))
.with_email(
self.eval_if(
&config.address,
&RecipientDomain::new(event.domain.as_str()),
)
.await
.unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()),
);
if let Some(org_name) = self
.eval_if::<String, _>(
&config.org_name,
&RecipientDomain::new(event.domain.as_str()),
)
.await
{
report = report.with_org_name(org_name);
}
if let Some(contact_info) = self
.eval_if::<String, _>(
&config.contact_info,
&RecipientDomain::new(event.domain.as_str()),
)
.await
{
report = report.with_extra_contact_info(contact_info);
}
for record in records {
report = report.with_record(record);
}
let from_addr = self
.eval_if(
&config.address,
@ -472,12 +416,66 @@ impl SMTP {
self.delete_dmarc_report(event).await;
}
pub(crate) async fn fetch_dmarc_records(
pub(crate) async fn generate_dmarc_aggregate_report(
&self,
event: &ReportEvent,
dmarc: &DmarcFormat,
rua: &mut Vec<URI>,
mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>,
) -> store::Result<Vec<Record>> {
) -> store::Result<Option<Report>> {
// Deserialize report
let dmarc = match self
.shared
.default_data_store
.get_value::<Bincode<DmarcFormat>>(ValueKey::from(ValueClass::Queue(
QueueClass::DmarcReportHeader(event.clone()),
)))
.await?
{
Some(dmarc) => dmarc.inner,
None => {
return Ok(None);
}
};
let _ = std::mem::replace(rua, dmarc.rua);
// Create report
let config = &self.report.config.dmarc_aggregate;
let mut report = Report::new()
.with_policy_published(dmarc.policy)
.with_date_range_begin(event.seq_id)
.with_date_range_end(event.due)
.with_report_id(format!("{}_{}", event.policy_hash, event.seq_id))
.with_email(
self.eval_if(
&config.address,
&RecipientDomain::new(event.domain.as_str()),
)
.await
.unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string()),
);
if let Some(org_name) = self
.eval_if::<String, _>(
&config.org_name,
&RecipientDomain::new(event.domain.as_str()),
)
.await
{
report = report.with_org_name(org_name);
}
if let Some(contact_info) = self
.eval_if::<String, _>(
&config.contact_info,
&RecipientDomain::new(event.domain.as_str()),
)
.await
{
report = report.with_extra_contact_info(contact_info);
}
if let Some(serialized_size) = serialized_size.as_deref_mut() {
let _ = serde::Serialize::serialize(&report, serialized_size);
}
// Group duplicates
let from_key = ValueKey::from(ValueClass::Queue(QueueClass::DmarcReportEvent(
ReportEvent {
@ -522,12 +520,11 @@ impl SMTP {
)
.await?;
let mut records = Vec::with_capacity(record_map.len());
for (record, count) in record_map {
records.push(record.with_count(count));
report = report.with_record(record.with_count(count));
}
Ok(records)
Ok(Some(report))
}
pub async fn delete_dmarc_report(&self, event: ReportEvent) {

View file

@ -311,6 +311,7 @@ impl SerializedSize {
impl io::Write for SerializedSize {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
//let c = print!(" (left: {}, buf: {})", self.bytes_left, buf.len());
let buf_len = buf.len();
if buf_len <= self.bytes_left {
self.bytes_left -= buf_len;

View file

@ -70,7 +70,7 @@ impl SpawnReport for mpsc::Receiver<Event> {
match report_event {
QueueClass::DmarcReportHeader(event) if event.due <= now => {
if core_.try_lock_report(QueueClass::dmarc_lock(&event)).await {
core_.generate_dmarc_report(event).await;
core_.send_dmarc_aggregate_report(event).await;
}
}
QueueClass::TlsReportHeader(event) if event.due <= now => {
@ -83,12 +83,12 @@ impl SpawnReport for mpsc::Receiver<Event> {
}
}
for (domain_name, tls_report) in tls_reports {
for (_, tls_report) in tls_reports {
if core_
.try_lock_report(QueueClass::tls_lock(tls_report.first().unwrap()))
.await
{
core_.generate_tls_report(domain_name, tls_report).await;
core_.send_tls_aggregate_report(tls_report).await;
}
}
});

View file

@ -67,10 +67,10 @@ pub struct TlsFormat {
pub static TLS_HTTP_REPORT: parking_lot::Mutex<Vec<u8>> = parking_lot::Mutex::new(Vec::new());
impl SMTP {
pub async fn generate_tls_report(&self, domain_name: String, events: Vec<ReportEvent>) {
let (event_from, event_to, policy) = events
pub async fn send_tls_aggregate_report(&self, events: Vec<ReportEvent>) {
let (domain_name, event_from, event_to) = events
.first()
.map(|e| (e.seq_id, e.due, e.policy_hash))
.map(|e| (e.domain.as_str(), e.seq_id, e.due))
.unwrap();
let span = tracing::info_span!(
@ -80,107 +80,41 @@ impl SMTP {
range_to = event_to,
);
// Deserialize report
let config = &self.report.config.tls;
let mut report = TlsReport {
organization_name: self
.eval_if(
&config.org_name,
&RecipientDomain::new(domain_name.as_str()),
)
.await
.clone(),
date_range: DateRange {
start_datetime: DateTime::from_timestamp(event_from as i64),
end_datetime: DateTime::from_timestamp(event_to as i64),
},
contact_info: self
.eval_if(
&config.contact_info,
&RecipientDomain::new(domain_name.as_str()),
)
.await
.clone(),
report_id: format!("{}_{}", event_from, policy),
policies: Vec::with_capacity(events.len()),
};
// Generate report
let mut rua = Vec::new();
let mut serialized_size = serde_json::Serializer::new(SerializedSize::new(
self.eval_if(
&self.report.config.tls.max_size,
&RecipientDomain::new(domain_name.as_str()),
&RecipientDomain::new(domain_name),
)
.await
.unwrap_or(25 * 1024 * 1024),
));
let _ = serde::Serialize::serialize(&report, &mut serialized_size);
for event in &events {
// Deserialize report
let tls = match self
.shared
.default_data_store
.get_value::<Bincode<TlsFormat>>(ValueKey::from(ValueClass::Queue(
QueueClass::TlsReportHeader(event.clone()),
)))
.await
{
Ok(Some(dmarc)) => dmarc.inner,
Ok(None) => {
tracing::warn!(
parent: &span,
event = "missing",
"Failed to read DMARC report: Report not found"
);
continue;
}
Err(err) => {
tracing::warn!(
parent: &span,
event = "error",
"Failed to read DMARC report: {}",
err
);
continue;
}
};
let _ = serde::Serialize::serialize(&tls, &mut serialized_size);
// Fetch policy
match self
.fetch_tls_policy(event, tls.policy, Some(&mut serialized_size))
.await
{
Ok(policy) => {
report.policies.push(policy);
for entry in tls.rua {
if !rua.contains(&entry) {
rua.push(entry);
}
}
}
Err(err) => {
tracing::warn!(
parent: &span,
event = "error",
"Failed to read TLS report: {}",
err
);
return;
}
let report = match self
.generate_tls_aggregate_report(&events, &mut rua, Some(&mut serialized_size))
.await
{
Ok(Some(report)) => report,
Ok(None) => {
// This should not happen
tracing::warn!(
parent: &span,
event = "empty-report",
"No policies found in report"
);
self.delete_tls_report(events).await;
return;
}
}
if report.policies.is_empty() {
// This should not happen
tracing::warn!(
parent: &span,
event = "empty-report",
"No policies found in report"
);
self.delete_tls_report(events).await;
return;
}
Err(err) => {
tracing::warn!(
parent: &span,
event = "error",
"Failed to read TLS report: {}",
err
);
return;
}
};
// Compress and serialize report
let json = report.to_json();
@ -264,22 +198,23 @@ impl SMTP {
// Deliver report over SMTP
if !rcpts.is_empty() {
let config = &self.report.config.tls;
let from_addr = self
.eval_if(&config.address, &RecipientDomain::new(domain_name.as_str()))
.eval_if(&config.address, &RecipientDomain::new(domain_name))
.await
.unwrap_or_else(|| "MAILER-DAEMON@localhost".to_string());
let mut message = Vec::with_capacity(2048);
let _ = report.write_rfc5322_from_bytes(
&domain_name,
domain_name,
&self
.eval_if(
&self.report.config.submitter,
&RecipientDomain::new(domain_name.as_str()),
&RecipientDomain::new(domain_name),
)
.await
.unwrap_or_else(|| "localhost".to_string()),
(
self.eval_if(&config.name, &RecipientDomain::new(domain_name.as_str()))
self.eval_if(&config.name, &RecipientDomain::new(domain_name))
.await
.unwrap_or_else(|| "Mail Delivery Subsystem".to_string())
.as_str(),
@ -310,76 +245,139 @@ impl SMTP {
self.delete_tls_report(events).await;
}
pub(crate) async fn fetch_tls_policy(
pub(crate) async fn generate_tls_aggregate_report(
&self,
event: &ReportEvent,
policy_details: PolicyDetails,
events: &[ReportEvent],
rua: &mut Vec<ReportUri>,
mut serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>,
) -> store::Result<Policy> {
// Group duplicates
let mut total_success = 0;
let mut total_failure = 0;
) -> store::Result<Option<TlsReport>> {
let (domain_name, event_from, event_to, policy) = events
.first()
.map(|e| (e.domain.as_str(), e.seq_id, e.due, e.policy_hash))
.unwrap();
let config = &self.report.config.tls;
let mut report = TlsReport {
organization_name: self
.eval_if(&config.org_name, &RecipientDomain::new(domain_name))
.await
.clone(),
date_range: DateRange {
start_datetime: DateTime::from_timestamp(event_from as i64),
end_datetime: DateTime::from_timestamp(event_to as i64),
},
contact_info: self
.eval_if(&config.contact_info, &RecipientDomain::new(domain_name))
.await
.clone(),
report_id: format!("{}_{}", event_from, policy),
policies: Vec::with_capacity(events.len()),
};
let from_key = ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(ReportEvent {
due: event.due,
policy_hash: event.policy_hash,
seq_id: 0,
domain: event.domain.clone(),
})));
let to_key = ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(ReportEvent {
due: event.due,
policy_hash: event.policy_hash,
seq_id: u64::MAX,
domain: event.domain.clone(),
})));
let mut record_map = AHashMap::new();
self.shared
.default_data_store
.iterate(IterateParams::new(from_key, to_key).ascending(), |_, v| {
if let Some(failure_details) =
Bincode::<Option<FailureDetails>>::deserialize(v)?.inner
{
match record_map.entry(failure_details) {
Entry::Occupied(mut e) => {
total_failure += 1;
*e.get_mut() += 1;
Ok(true)
}
Entry::Vacant(e) => {
if serialized_size
.as_deref_mut()
.map_or(true, |serialized_size| {
serde::Serialize::serialize(e.key(), serialized_size).is_ok()
})
{
if let Some(serialized_size) = serialized_size.as_deref_mut() {
let _ = serde::Serialize::serialize(&report, serialized_size);
}
for event in events {
let tls = if let Some(tls) = self
.shared
.default_data_store
.get_value::<Bincode<TlsFormat>>(ValueKey::from(ValueClass::Queue(
QueueClass::TlsReportHeader(event.clone()),
)))
.await?
{
tls.inner
} else {
continue;
};
if let Some(serialized_size) = serialized_size.as_deref_mut() {
if serde::Serialize::serialize(&tls, serialized_size).is_err() {
continue;
}
}
// Group duplicates
let mut total_success = 0;
let mut total_failure = 0;
let from_key =
ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(ReportEvent {
due: event.due,
policy_hash: event.policy_hash,
seq_id: 0,
domain: event.domain.clone(),
})));
let to_key =
ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(ReportEvent {
due: event.due,
policy_hash: event.policy_hash,
seq_id: u64::MAX,
domain: event.domain.clone(),
})));
let mut record_map = AHashMap::new();
self.shared
.default_data_store
.iterate(IterateParams::new(from_key, to_key).ascending(), |_, v| {
if let Some(failure_details) =
Bincode::<Option<FailureDetails>>::deserialize(v)?.inner
{
match record_map.entry(failure_details) {
Entry::Occupied(mut e) => {
total_failure += 1;
e.insert(1u32);
*e.get_mut() += 1;
Ok(true)
} else {
Ok(false)
}
Entry::Vacant(e) => {
if serialized_size
.as_deref_mut()
.map_or(true, |serialized_size| {
serde::Serialize::serialize(e.key(), serialized_size)
.is_ok()
})
{
total_failure += 1;
e.insert(1u32);
Ok(true)
} else {
Ok(false)
}
}
}
} else {
total_success += 1;
Ok(true)
}
} else {
total_success += 1;
Ok(true)
}
})
.await?;
Ok(Policy {
policy: policy_details,
summary: Summary {
total_success,
total_failure,
},
failure_details: record_map
.into_iter()
.map(|(mut r, count)| {
r.failed_session_count = count;
r
})
.collect(),
.await?;
// Add policy
report.policies.push(Policy {
policy: tls.policy,
summary: Summary {
total_success,
total_failure,
},
failure_details: record_map
.into_iter()
.map(|(mut r, count)| {
r.failed_session_count = count;
r
})
.collect(),
});
// Add report URIs
for entry in tls.rua {
if !rua.contains(&entry) {
rua.push(entry);
}
}
}
Ok(if !report.policies.is_empty() {
Some(report)
} else {
None
})
}

View file

@ -87,7 +87,10 @@ impl CharUtils for char {
}
pub fn fn_cure_text<'x>(_: &'x Context<'x, SieveContext>, v: Vec<Variable>) -> Variable {
decancer::cure(v[0].to_string().as_ref()).into_str().into()
decancer::cure(v[0].to_string().as_ref(), decancer::Options::default())
.map(|s| s.into_str())
.unwrap_or_default()
.into()
}
pub fn fn_unicode_skeleton<'x>(_: &'x Context<'x, SieveContext>, v: Vec<Variable>) -> Variable {

View file

@ -9,8 +9,8 @@ utils = { path = "../utils" }
nlp = { path = "../nlp" }
rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] }
foundationdb = { version = "0.8.0", features = ["embedded-fdb-include"], optional = true }
rusqlite = { version = "0.30.0", features = ["bundled"], optional = true }
rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"], optional = true }
rusqlite = { version = "0.31.0", features = ["bundled"], optional = true }
rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true }
tokio = { version = "1.23", features = ["sync", "fs", "io-util"] }
r2d2 = { version = "0.8.10", optional = true }
futures = { version = "0.3", optional = true }

View file

@ -390,7 +390,7 @@ impl FdbStore {
}
}
pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
// Obtain all empty bitmaps
let trx = self.db.create_trx()?;
let mut iter = trx.get_ranges(

View file

@ -270,7 +270,7 @@ impl MysqlStore {
trx.commit().await.map(|_| true)
}
pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
let mut conn = self.conn_pool.get_conn().await?;
let s = conn

View file

@ -287,7 +287,7 @@ impl PostgresStore {
trx.commit().await.map(|_| true)
}
pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
let conn = self.conn_pool.get().await?;
let s = conn

View file

@ -120,7 +120,7 @@ impl RocksDbStore {
.await
}
pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
let db = self.db.clone();
self.spawn_worker(move || {
let cf = db

View file

@ -203,7 +203,7 @@ impl SqliteStore {
.await
}
pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
let conn = self.conn_pool.get()?;
self.spawn_worker(move || {
conn.prepare_cached(&format!(

View file

@ -228,7 +228,7 @@ impl ConfigStore for Config {
schedules.push(PurgeSchedule {
cron,
store_id: store_id.to_string(),
store: PurgeStore::Bitmaps(store.clone()),
store: PurgeStore::Data(store.clone()),
});
}

View file

@ -294,7 +294,7 @@ impl LookupStore {
}
}
pub async fn purge_expired(&self) -> crate::Result<()> {
pub async fn purge_lookup_store(&self) -> crate::Result<()> {
match self {
LookupStore::Store(store) => {
// Delete expired keys

View file

@ -26,7 +26,7 @@ use std::ops::{BitAndAssign, Range};
use roaring::RoaringBitmap;
use crate::{
write::{key::KeySerializer, AnyKey, Batch, BitmapClass, ValueClass},
write::{key::KeySerializer, now, AnyKey, Batch, BitmapClass, ReportClass, ValueClass},
BitmapKey, Deserialize, IterateParams, Key, Store, ValueKey, SUBSPACE_BITMAPS,
SUBSPACE_INDEXES, SUBSPACE_LOGS, U32_LEN,
};
@ -241,18 +241,45 @@ impl Store {
}
}
pub async fn purge_bitmaps(&self) -> crate::Result<()> {
pub async fn purge_store(&self) -> crate::Result<()> {
// Delete expired reports
let now = now();
self.delete_range(
ValueKey::from(ValueClass::Report(ReportClass::Dmarc { id: 0, expires: 0 })),
ValueKey::from(ValueClass::Report(ReportClass::Dmarc {
id: u64::MAX,
expires: now,
})),
)
.await?;
self.delete_range(
ValueKey::from(ValueClass::Report(ReportClass::Tls { id: 0, expires: 0 })),
ValueKey::from(ValueClass::Report(ReportClass::Tls {
id: u64::MAX,
expires: now,
})),
)
.await?;
self.delete_range(
ValueKey::from(ValueClass::Report(ReportClass::Arf { id: 0, expires: 0 })),
ValueKey::from(ValueClass::Report(ReportClass::Arf {
id: u64::MAX,
expires: now,
})),
)
.await?;
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.purge_bitmaps().await,
Self::SQLite(store) => store.purge_store().await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.purge_bitmaps().await,
Self::FoundationDb(store) => store.purge_store().await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.purge_bitmaps().await,
Self::PostgreSQL(store) => store.purge_store().await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.purge_bitmaps().await,
Self::MySQL(store) => store.purge_store().await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.purge_bitmaps().await,
Self::RocksDb(store) => store.purge_store().await,
}
}
@ -462,7 +489,7 @@ impl Store {
self.blob_expire_all().await;
self.purge_blobs(blob_store).await.unwrap();
self.purge_bitmaps().await.unwrap();
self.purge_store().await.unwrap();
let store = self.clone();
let mut failed = false;

View file

@ -31,8 +31,8 @@ use crate::{
};
use super::{
AnyKey, BitmapClass, BlobOp, DirectoryClass, LookupClass, QueueClass, ReportEvent, TagValue,
ValueClass,
AnyKey, BitmapClass, BlobOp, DirectoryClass, LookupClass, QueueClass, ReportClass, ReportEvent,
TagValue, ValueClass,
};
pub struct KeySerializer {
@ -348,6 +348,17 @@ impl<T: AsRef<ValueClass> + Sync + Send> Key for ValueKey<T> {
QueueClass::QuotaCount(key) => serializer.write(55u8).write(key.as_slice()),
QueueClass::QuotaSize(key) => serializer.write(56u8).write(key.as_slice()),
},
ValueClass::Report(report) => match report {
ReportClass::Tls { id, expires } => {
serializer.write(60u8).write(*expires).write(*id)
}
ReportClass::Dmarc { id, expires } => {
serializer.write(61u8).write(*expires).write(*id)
}
ReportClass::Arf { id, expires } => {
serializer.write(62u8).write(*expires).write(*id)
}
},
}
.finalize()
}
@ -503,6 +514,7 @@ impl ValueClass {
}
QueueClass::QuotaCount(v) | QueueClass::QuotaSize(v) => v.len(),
},
ValueClass::Report(_) => U64_LEN * 2 + 1,
}
}
}

View file

@ -140,6 +140,7 @@ pub enum ValueClass {
IndexEmail(u64),
Config(Vec<u8>),
Queue(QueueClass),
Report(ReportClass),
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
@ -172,6 +173,13 @@ pub enum QueueClass {
QuotaSize(Vec<u8>),
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum ReportClass {
Tls { id: u64, expires: u64 },
Dmarc { id: u64, expires: u64 },
Arf { id: u64, expires: u64 },
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub struct QueueEvent {
pub due: u64,

View file

@ -29,7 +29,7 @@ use utils::config::cron::SimpleCron;
use crate::{BlobStore, LookupStore, Store};
pub enum PurgeStore {
Bitmaps(Store),
Data(Store),
Blobs { store: Store, blob_store: BlobStore },
Lookup(LookupStore),
}
@ -62,11 +62,11 @@ impl PurgeSchedule {
}
let result = match &self.store {
PurgeStore::Bitmaps(store) => store.purge_bitmaps().await,
PurgeStore::Data(store) => store.purge_store().await,
PurgeStore::Blobs { store, blob_store } => {
store.purge_blobs(blob_store.clone()).await
}
PurgeStore::Lookup(store) => store.purge_expired().await,
PurgeStore::Lookup(store) => store.purge_lookup_store().await,
};
if let Err(err) = result {
@ -85,7 +85,7 @@ impl PurgeSchedule {
impl Display for PurgeStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PurgeStore::Bitmaps(_) => write!(f, "bitmaps"),
PurgeStore::Data(_) => write!(f, "bitmaps"),
PurgeStore::Blobs { .. } => write!(f, "blobs"),
PurgeStore::Lookup(_) => write!(f, "expired keys"),
}

View file

@ -5,7 +5,7 @@ edition = "2021"
resolver = "2"
[dependencies]
rustls = { version = "0.22", features = ["tls12"]}
rustls = { version = "0.22", default-features = false, features = ["tls12"]}
rustls-pemfile = "2.0"
rustls-pki-types = { version = "1" }
tokio = { version = "1.23", features = ["net", "macros"] }
@ -32,7 +32,7 @@ base64 = "0.21"
serde_json = "1.0"
rcgen = "0.12"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-webpki-roots"]}
x509-parser = "0.15.0"
x509-parser = "0.16.0"
pem = "3.0"
parking_lot = "0.12"
arc-swap = "1.6.0"

View file

@ -8,7 +8,7 @@
[report.analysis]
addresses = ["dmarc@*", "abuse@*", "postmaster@*"]
forward = true
#store = "%{BASE_PATH}%/incoming"
store = "30d"
[report.dsn]
from-name = "'Mail Delivery Subsystem'"

View file

@ -23,7 +23,7 @@
use std::time::Duration;
use reqwest::header::AUTHORIZATION;
use reqwest::{header::AUTHORIZATION, Method};
use serde::{de::DeserializeOwned, Deserialize};
pub mod queue;
@ -36,19 +36,22 @@ pub enum Response<T> {
Error { error: String, details: String },
}
pub async fn send_manage_request<T: DeserializeOwned>(query: &str) -> Result<Response<T>, String> {
send_manage_request_raw(query).await.map(|result| {
pub async fn send_manage_request<T: DeserializeOwned>(
method: Method,
query: &str,
) -> Result<Response<T>, String> {
send_manage_request_raw(method, query).await.map(|result| {
serde_json::from_str::<Response<T>>(&result).unwrap_or_else(|err| panic!("{err}: {result}"))
})
}
pub async fn send_manage_request_raw(query: &str) -> Result<String, String> {
pub async fn send_manage_request_raw(method: Method, query: &str) -> Result<String, String> {
reqwest::Client::builder()
.timeout(Duration::from_millis(500))
.danger_accept_invalid_certs(true)
.build()
.unwrap()
.get(format!("https://127.0.0.1:9980{query}"))
.request(method, format!("https://127.0.0.1:9980{query}"))
.header(AUTHORIZATION, "Basic YWRtaW46c2VjcmV0")
.send()
.await
@ -69,6 +72,16 @@ impl<T> Response<T> {
}
}
pub fn try_unwrap_data(self) -> Option<T> {
match self {
Response::Data { data } => Some(data),
Response::Error { error, .. } if error == "not-found" => None,
Response::Error { error, details } => {
panic!("Expected data, found error {error:?}: {details:?}")
}
}
}
pub fn unwrap_error(self) -> (String, String) {
match self {
Response::Error { error, details } => (error, details),

View file

@ -30,7 +30,7 @@ use ahash::{AHashMap, HashMap, HashSet};
use directory::core::config::ConfigDirectory;
use mail_auth::MX;
use mail_parser::DateTime;
use reqwest::{header::AUTHORIZATION, StatusCode};
use reqwest::{header::AUTHORIZATION, Method, StatusCode};
use store::Store;
use utils::config::{if_block::IfBlock, Config, ServerProtocol};
@ -61,9 +61,9 @@ member-of = ["superusers"]
#[derive(serde::Deserialize)]
#[allow(dead_code)]
struct List<T> {
items: Vec<T>,
total: usize,
pub(super) struct List<T> {
pub items: Vec<T>,
pub total: usize,
}
#[tokio::test]
@ -192,7 +192,7 @@ async fn manage_queue() {
);
// Fetch and validate messages
let ids = send_manage_request::<List<QueueId>>("/api/queue/list")
let ids = send_manage_request::<List<QueueId>>(Method::GET, "/api/queue/messages")
.await
.unwrap()
.unwrap_data()
@ -277,28 +277,28 @@ async fn manage_queue() {
// Test list search
for (query, expected_ids) in [
(
"/api/queue/list?from=bill1@foobar.net".to_string(),
"/api/queue/messages?from=bill1@foobar.net".to_string(),
vec!["a"],
),
(
"/api/queue/list?to=foobar.org".to_string(),
"/api/queue/messages?to=foobar.org".to_string(),
vec!["d", "e", "f"],
),
(
"/api/queue/list?from=bill3@foobar.net&to=rcpt5@example1.com".to_string(),
"/api/queue/messages?from=bill3@foobar.net&to=rcpt5@example1.com".to_string(),
vec!["c"],
),
(
format!("/api/queue/list?before={test_search}"),
format!("/api/queue/messages?before={test_search}"),
vec!["a", "b"],
),
(
format!("/api/queue/list?after={test_search}"),
format!("/api/queue/messages?after={test_search}"),
vec!["d", "e", "f", "c"],
),
] {
let expected_ids = HashSet::from_iter(expected_ids.into_iter().map(|s| s.to_string()));
let ids = send_manage_request::<List<QueueId>>(&query)
let ids = send_manage_request::<List<QueueId>>(Method::GET, &query)
.await
.unwrap()
.unwrap_data()
@ -310,27 +310,24 @@ async fn manage_queue() {
}
// Retry delivery
assert_eq!(
send_manage_request::<Vec<bool>>(&format!(
"/api/queue/retry?id={},{}",
id_map.get("e").unwrap(),
id_map.get("f").unwrap()
))
.await
.unwrap()
.unwrap_data(),
vec![true, true]
);
assert_eq!(
send_manage_request::<Vec<bool>>(&format!(
"/api/queue/retry?id={}&filter=example1.org&at=2200-01-01T00:00:00Z",
for id in [id_map.get("e").unwrap(), id_map.get("f").unwrap()] {
assert!(
send_manage_request::<bool>(Method::PATCH, &format!("/api/queue/messages/{id}",))
.await
.unwrap()
.unwrap_data(),
);
}
assert!(send_manage_request::<bool>(
Method::PATCH,
&format!(
"/api/queue/messages/{}?filter=example1.org&at=2200-01-01T00:00:00Z",
id_map.get("a").unwrap(),
))
.await
.unwrap()
.unwrap_data(),
vec![true]
);
)
)
.await
.unwrap()
.unwrap_data());
// Expect delivery to john@foobar.org
tokio::time::sleep(Duration::from_millis(100)).await;
@ -384,22 +381,24 @@ async fn manage_queue() {
("c", "rcpt6@example2.com"),
("d", ""),
] {
assert_eq!(
send_manage_request::<Vec<bool>>(&format!(
"/api/queue/cancel?id={}{}{}",
id_map.get(id).unwrap(),
if !filter.is_empty() { "&filter=" } else { "" },
filter
))
assert!(
send_manage_request::<bool>(
Method::DELETE,
&format!(
"/api/queue/messages/{}{}{}",
id_map.get(id).unwrap(),
if !filter.is_empty() { "?filter=" } else { "" },
filter
)
)
.await
.unwrap()
.unwrap_data(),
vec![true],
"failed for {id}: {filter}"
);
}
assert_eq!(
send_manage_request::<List<QueueId>>("/api/queue/list")
send_manage_request::<List<QueueId>>(Method::GET, "/api/queue/messages")
.await
.unwrap()
.unwrap_data()
@ -485,14 +484,16 @@ fn assert_timestamp(timestamp: &DateTime, expected: i64, ctx: &str, message: &Me
}
async fn get_messages(ids: &[QueueId]) -> Vec<Option<Message>> {
send_manage_request(&format!(
"/api/queue/status?id={}",
ids.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(",")
))
.await
.unwrap()
.unwrap_data()
let mut results = Vec::with_capacity(ids.len());
for id in ids {
let message =
send_manage_request::<Message>(Method::GET, &format!("/api/queue/messages/{id}",))
.await
.unwrap()
.try_unwrap_data();
results.push(message);
}
results
}

View file

@ -34,12 +34,16 @@ use mail_auth::{
ActionDisposition, DmarcResult, Record,
},
};
use reqwest::Method;
use store::Store;
use tokio::sync::mpsc;
use utils::config::{if_block::IfBlock, Config, ServerProtocol};
use crate::smtp::{
inbound::dummy_stores, management::send_manage_request, outbound::start_test_server, TestConfig,
inbound::dummy_stores,
management::{queue::List, send_manage_request},
outbound::start_test_server,
TestConfig,
};
use smtp::{
config::AggregateFrequency,
@ -141,10 +145,11 @@ async fn manage_reports() {
.await;
// List reports
let ids = send_manage_request::<Vec<String>>("/admin/report/list")
let ids = send_manage_request::<List<String>>(Method::GET, "/api/queue/reports")
.await
.unwrap()
.unwrap_data();
.unwrap_data()
.items;
assert_eq!(ids.len(), 4);
let mut id_map = AHashMap::new();
let mut id_map_rev = AHashMap::new();
@ -186,18 +191,19 @@ async fn manage_reports() {
// Test list search
for (query, expected_ids) in [
("/admin/report/list?type=dmarc", vec!["a", "b"]),
("/admin/report/list?type=tls", vec!["c", "d"]),
("/admin/report/list?domain=foobar.org", vec!["a", "c"]),
("/admin/report/list?domain=foobar.net", vec!["b", "d"]),
("/admin/report/list?domain=foobar.org&type=dmarc", vec!["a"]),
("/admin/report/list?domain=foobar.net&type=tls", vec!["d"]),
("/api/queue/reports?type=dmarc", vec!["a", "b"]),
("/api/queue/reports?type=tls", vec!["c", "d"]),
("/api/queue/reports?domain=foobar.org", vec!["a", "c"]),
("/api/queue/reports?domain=foobar.net", vec!["b", "d"]),
("/api/queue/reports?domain=foobar.org&type=dmarc", vec!["a"]),
("/api/queue/reports?domain=foobar.net&type=tls", vec!["d"]),
] {
let expected_ids = HashSet::from_iter(expected_ids.into_iter().map(|s| s.to_string()));
let ids = send_manage_request::<Vec<String>>(query)
let ids = send_manage_request::<List<String>>(Method::GET, query)
.await
.unwrap()
.unwrap_data()
.items
.into_iter()
.map(|id| id_map_rev.get(&id).unwrap().clone())
.collect::<HashSet<_>>();
@ -206,23 +212,23 @@ async fn manage_reports() {
// Cancel reports
for id in ["a", "b"] {
assert_eq!(
send_manage_request::<Vec<bool>>(&format!(
"/admin/report/cancel?id={}",
id_map.get(id).unwrap(),
))
assert!(
send_manage_request::<bool>(
Method::DELETE,
&format!("/api/queue/reports/{}", id_map.get(id).unwrap(),)
)
.await
.unwrap()
.unwrap_data(),
vec![true],
"failed for {id}"
);
}
assert_eq!(
send_manage_request::<Vec<String>>("/admin/report/list")
send_manage_request::<List<String>>(Method::GET, "/api/queue/reports")
.await
.unwrap()
.unwrap_data()
.items
.len(),
2
);
@ -241,8 +247,16 @@ async fn manage_reports() {
}
async fn get_reports(ids: &[String]) -> Vec<Option<Report>> {
send_manage_request(&format!("/admin/report/status?id={}", ids.join(",")))
.await
.unwrap()
.unwrap_data()
let mut results = Vec::with_capacity(ids.len());
for id in ids {
let report =
send_manage_request::<Report>(Method::GET, &format!("/api/queue/reports/{id}",))
.await
.unwrap()
.try_unwrap_data();
results.push(report);
}
results
}

View file

@ -408,7 +408,7 @@ impl TestConfig for ReportConfig {
addresses: vec![],
forward: true,
store: None,
report_id: 0.into(),
report_id: SnowflakeIdGenerator::new(),
},
dkim: Report::test(),
spf: Report::test(),

View file

@ -21,25 +21,25 @@
* for more details.
*/
use std::{fs, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
use crate::smtp::{
inbound::TestQueueEvent, make_temp_dir, session::TestSession, TestConfig, TestSMTP,
};
use crate::smtp::{inbound::TestQueueEvent, session::TestSession, TestConfig, TestSMTP};
use smtp::{
config::AddressMatch,
core::{Session, SMTP},
};
use store::{
write::{ReportClass, ValueClass},
IterateParams, ValueKey,
};
use utils::config::if_block::IfBlock;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn report_analyze() {
let mut core = SMTP::test();
// Create temp dir for queue
let mut qr = core.init_test_queue("smtp_analyze_report_test");
let report_dir = make_temp_dir("smtp_report_incoming", true);
let config = &mut core.session.config.rcpt;
config.relay = IfBlock::new(true);
let config = &mut core.session.config.data;
@ -51,10 +51,16 @@ async fn report_analyze() {
AddressMatch::Equals("feedback@foobar.org".to_string()),
];
config.forward = false;
config.store = report_dir.temp_dir.clone().into();
config.store = Duration::from_secs(1).into();
//config.store = Duration::from_secs(86400).into();
// Create test message
let core = Arc::new(core);
/*let rx_manage = crate::smtp::outbound::start_test_server(
core.clone(),
&[utils::config::ServerProtocol::Http],
);*/
let mut session = Session::test(core.clone());
session.data.remote_ip_str = "10.0.0.1".to_string();
session.eval_session_params().await;
@ -84,14 +90,53 @@ async fn report_analyze() {
}
tokio::time::sleep(Duration::from_millis(200)).await;
//let c = tokio::time::sleep(Duration::from_secs(86400)).await;
// Purging the database shouldn't remove the reports
qr.store.purge_store().await.unwrap();
// Make sure the reports are in the store
let mut total_reports = 0;
for entry in fs::read_dir(&report_dir.temp_dir).unwrap() {
let path = entry.unwrap().path();
assert_ne!(fs::metadata(&path).unwrap().len(), 0);
total_reports += 1;
}
qr.store
.iterate(
IterateParams::new(
ValueKey::from(ValueClass::Report(ReportClass::Tls { id: 0, expires: 0 })),
ValueKey::from(ValueClass::Report(ReportClass::Arf {
id: u64::MAX,
expires: u64::MAX,
})),
),
|_, _| {
total_reports += 1;
Ok(true)
},
)
.await
.unwrap();
assert_eq!(total_reports, total_reports_received);
// Wait one second, purge, and make sure they are gone
tokio::time::sleep(Duration::from_secs(1)).await;
qr.store.purge_store().await.unwrap();
let mut total_reports = 0;
qr.store
.iterate(
IterateParams::new(
ValueKey::from(ValueClass::Report(ReportClass::Tls { id: 0, expires: 0 })),
ValueKey::from(ValueClass::Report(ReportClass::Arf {
id: u64::MAX,
expires: u64::MAX,
})),
),
|_, _| {
total_reports += 1;
Ok(true)
},
)
.await
.unwrap();
assert_eq!(total_reports, 0);
// Test delivery to non-report addresses
session
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")

View file

@ -117,7 +117,7 @@ async fn report_dmarc() {
assert_eq!(reports.len(), 1);
match reports.into_iter().next().unwrap() {
QueueClass::DmarcReportHeader(event) => {
core.generate_dmarc_report(event).await;
core.send_dmarc_aggregate_report(event).await;
}
_ => unreachable!(),
}

View file

@ -85,7 +85,7 @@ async fn report_tls() {
for (policy, rt) in [
(
smtp::reporting::PolicyType::None,
smtp::reporting::PolicyType::None, // Quota limited at 1532 bytes, this should not be included in the report.
ResultType::CertificateExpired,
),
(
@ -101,7 +101,7 @@ async fn report_tls() {
ResultType::StsPolicyInvalid,
),
(
smtp::reporting::PolicyType::Sts(None), // Quota limited at 1532 bytes, this should not be included in the report.
smtp::reporting::PolicyType::Sts(None),
ResultType::StsWebpkiInvalid,
),
] {
@ -128,8 +128,7 @@ async fn report_tls() {
_ => unreachable!(),
}
}
core.generate_tls_report(tls_reports.first().unwrap().domain.clone(), tls_reports)
.await;
core.send_tls_aggregate_report(tls_reports).await;
// Expect report
let message = qr.expect_message().await;
@ -167,10 +166,10 @@ async fn report_tls() {
}
PolicyType::Sts => {
seen[1] = true;
assert_eq!(policy.summary.total_failure, 2);
assert_eq!(policy.summary.total_failure, 3);
assert_eq!(policy.summary.total_success, 0);
assert_eq!(policy.policy.policy_domain, "foobar.org");
assert_eq!(policy.failure_details.len(), 2);
assert_eq!(policy.failure_details.len(), 3);
assert!(policy
.failure_details
.iter()
@ -182,14 +181,14 @@ async fn report_tls() {
}
PolicyType::NoPolicyFound => {
seen[2] = true;
assert_eq!(policy.summary.total_failure, 1);
assert_eq!(policy.summary.total_failure, 0);
assert_eq!(policy.summary.total_success, 2);
assert_eq!(policy.policy.policy_domain, "foobar.org");
assert_eq!(policy.failure_details.len(), 1);
assert_eq!(
assert_eq!(policy.failure_details.len(), 0);
/*assert_eq!(
policy.failure_details.first().unwrap().result_type,
ResultType::CertificateExpired
);
);*/
}
PolicyType::Other => unreachable!(),
}
@ -218,8 +217,7 @@ async fn report_tls() {
assert_eq!(reports.len(), 1);
match reports.into_iter().next().unwrap() {
QueueClass::TlsReportHeader(event) => {
core.generate_tls_report(event.domain.clone(), vec![event])
.await;
core.send_tls_aggregate_report(vec![event]).await;
}
_ => unreachable!(),
}

View file

@ -57,7 +57,7 @@ pub async fn lookup_tests() {
.key_set(key.clone(), "world".to_string().into_bytes(), None)
.await
.unwrap();
store.purge_expired().await.unwrap();
store.purge_lookup_store().await.unwrap();
assert_eq!(
store.key_get::<String>(key.clone()).await.unwrap(),
Some("world".to_string())
@ -75,7 +75,7 @@ pub async fn lookup_tests() {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
assert_eq!(None, store.key_get::<String>(key.clone()).await.unwrap());
store.purge_expired().await.unwrap();
store.purge_lookup_store().await.unwrap();
if let LookupStore::Store(store) = &store {
store.assert_is_empty(store.clone().into()).await;
}
@ -106,7 +106,7 @@ pub async fn lookup_tests() {
.unwrap();
assert_eq!(1, store.counter_get(key.clone()).await.unwrap());
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
store.purge_expired().await.unwrap();
store.purge_lookup_store().await.unwrap();
assert_eq!(0, store.counter_get(key.clone()).await.unwrap());
// Test rate limiter
@ -127,7 +127,7 @@ pub async fn lookup_tests() {
.unwrap()
.is_none());
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
store.purge_expired().await.unwrap();
store.purge_lookup_store().await.unwrap();
if let LookupStore::Store(store) = &store {
store.assert_is_empty(store.clone().into()).await;
}