Webhooks implementation (closes #480 closes #233)

This commit is contained in:
mdecimus 2024-06-20 19:11:15 +02:00
parent 5ff6bc895c
commit 68a189ed9f
72 changed files with 2368 additions and 380 deletions

367
Cargo.lock generated
View file

@ -202,7 +202,7 @@ dependencies = [
"base64ct",
"blake2",
"cpufeatures",
"password-hash 0.5.0",
"password-hash",
]
[[package]]
@ -369,18 +369,17 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "attohttpc"
version = "0.22.0"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fcf00bc6d5abb29b5f97e3c61a90b6d3caa12f3faf897d4a3e3607c050a35a7"
checksum = "0f77d243921b0979fbbd728dd2d5162e68ac8252976797c24eb5b3a6af9090dc"
dependencies = [
"http 0.2.12",
"log",
"rustls 0.20.9",
"rustls 0.21.12",
"serde",
"serde_json",
"url",
"webpki",
"webpki-roots 0.22.6",
"webpki-roots 0.25.4",
]
[[package]]
@ -391,14 +390,14 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "aws-creds"
version = "0.34.1"
version = "0.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3776743bb68d4ad02ba30ba8f64373f1be4e082fe47651767171ce75bb2f6cf5"
checksum = "390ad3b77f3e21e01a4a0355865853b681daf1988510b0b15e31c0c4ae7eb0f6"
dependencies = [
"attohttpc",
"dirs",
"home",
"log",
"quick-xml 0.26.0",
"quick-xml 0.30.0",
"rust-ini",
"serde",
"thiserror",
@ -633,7 +632,7 @@ dependencies = [
"arrayvec",
"cc",
"cfg-if",
"constant_time_eq 0.3.0",
"constant_time_eq",
]
[[package]]
@ -757,9 +756,9 @@ dependencies = [
[[package]]
name = "bytemuck"
version = "1.16.0"
version = "1.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5"
checksum = "b236fc92302c97ed75b38da1f4917b5cdda4984745740f153a5d3059e48d725e"
[[package]]
name = "byteorder"
@ -1024,9 +1023,9 @@ dependencies = [
"futures",
"hostname 0.4.0",
"hyper 1.3.1",
"idna 0.5.0",
"imagesize",
"infer",
"idna 1.0.1",
"imagesize 0.13.0",
"infer 0.16.0",
"jmap_proto",
"mail-auth",
"mail-parser",
@ -1067,7 +1066,8 @@ dependencies = [
"utils",
"whatlang",
"x509-parser 0.16.0",
"zip 0.6.6",
"xxhash-rust",
"zip",
]
[[package]]
@ -1089,18 +1089,32 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
[[package]]
name = "const-random"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
dependencies = [
"const-random-macro",
]
[[package]]
name = "const-random-macro"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [
"getrandom",
"once_cell",
"tiny-keccak",
]
[[package]]
name = "const_panic"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6051f239ecec86fde3410901ab7860d458d160371533842974fc61f96d15879b"
[[package]]
name = "constant_time_eq"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "constant_time_eq"
version = "0.3.0"
@ -1289,16 +1303,15 @@ dependencies = [
[[package]]
name = "curve25519-dalek"
version = "4.1.2"
version = "4.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348"
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
dependencies = [
"cfg-if",
"cpufeatures",
"curve25519-dalek-derive",
"digest 0.10.7",
"fiat-crypto",
"platforms",
"rustc_version 0.4.0",
"subtle",
"zeroize",
@ -1426,12 +1439,25 @@ dependencies = [
]
[[package]]
name = "deadpool-postgres"
name = "deadpool"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa"
checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed"
dependencies = [
"deadpool",
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-postgres"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ab8a4ea925ce79678034870834602a2980f4b88c09e97feb266496dbb4493d2"
dependencies = [
"async-trait",
"deadpool 0.12.1",
"getrandom",
"tokio",
"tokio-postgres",
"tracing",
@ -1448,9 +1474,9 @@ dependencies = [
[[package]]
name = "decancer"
version = "3.2.0"
version = "3.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a053d1eb02357b64ce1a85133b1919466fc164a87fae45c11330dd41d0483b1e"
checksum = "89dc48fa5c407ad29590bc5cc17a56684ed7b4f8215a9b65a31644ff051557f4"
dependencies = [
"lazy_static",
"paste",
@ -1591,7 +1617,7 @@ dependencies = [
"ahash 0.8.11",
"argon2",
"async-trait",
"deadpool",
"deadpool 0.10.0",
"futures",
"jmap_proto",
"ldap3",
@ -1601,8 +1627,8 @@ dependencies = [
"mail-send",
"md5",
"parking_lot",
"password-hash 0.5.0",
"pbkdf2 0.12.2",
"password-hash",
"pbkdf2",
"pwhash",
"regex",
"rustls 0.22.4",
@ -1619,15 +1645,6 @@ dependencies = [
"utils",
]
[[package]]
name = "dirs"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-next"
version = "2.0.0"
@ -1638,17 +1655,6 @@ dependencies = [
"dirs-sys-next",
]
[[package]]
name = "dirs-sys"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]]
name = "dirs-sys-next"
version = "0.1.2"
@ -1673,9 +1679,12 @@ dependencies = [
[[package]]
name = "dlv-list"
version = "0.3.0"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"
checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
dependencies = [
"const-random",
]
[[package]]
name = "dns-update"
@ -1950,6 +1959,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95765f67b4b18863968b4a1bd5bb576f732b29a4a28c7cd84c09fa3e2875f33c"
[[package]]
name = "fastrand"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
[[package]]
name = "ff"
version = "0.13.0"
@ -2360,6 +2375,12 @@ dependencies = [
"ahash 0.7.8",
]
[[package]]
name = "hashbrown"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
[[package]]
name = "hashbrown"
version = "0.14.5"
@ -2699,6 +2720,19 @@ dependencies = [
"tokio-io-timeout",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper 0.14.29",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "hyper-util"
version = "0.1.5"
@ -2897,9 +2931,9 @@ dependencies = [
[[package]]
name = "idna"
version = "1.0.0"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed"
checksum = "44a986806a1cc899952ba462bc1f28afbfd5850ab6cb030ccb20dd02cc527a24"
dependencies = [
"icu_normalizer",
"icu_properties",
@ -2913,6 +2947,12 @@ version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "029d73f573d8e8d63e6d5020011d3255b28c3ba85d6cf870a07184ed23de9284"
[[package]]
name = "imagesize"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edcd27d72f2f071c64249075f42e205ff93c9a4c5f6c6da53e79ed9f9832c285"
[[package]]
name = "imap"
version = "0.8.1"
@ -2993,6 +3033,15 @@ dependencies = [
"cfb",
]
[[package]]
name = "infer"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc150e5ce2330295b8616ce0e3f53250e53af31759a9dbedad1621ba29151847"
dependencies = [
"cfb",
]
[[package]]
name = "inout"
version = "0.1.3"
@ -3496,9 +3545,9 @@ dependencies = [
[[package]]
name = "mail-auth"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282cd8ca9254ca13a30f3b6d940057de71ae568303497c7aa4fbab36be2f0aea"
checksum = "9bd9d657de66a3d5ac360c3eab8c9f5cac2565f2b97cc032d5de4c900ef470de"
dependencies = [
"ahash 0.8.11",
"flate2",
@ -3507,14 +3556,14 @@ dependencies = [
"mail-builder",
"mail-parser",
"parking_lot",
"quick-xml 0.31.0",
"quick-xml 0.32.0",
"rand",
"ring 0.17.8",
"rsa",
"rustls-pemfile 2.1.2",
"serde",
"serde_json",
"zip 2.1.3",
"zip",
]
[[package]]
@ -3818,7 +3867,24 @@ dependencies = [
"thiserror",
"time",
"uuid",
"zstd 0.13.1",
"zstd",
]
[[package]]
name = "native-tls"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466"
dependencies = [
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
@ -4160,12 +4226,12 @@ dependencies = [
[[package]]
name = "ordered-multimap"
version = "0.4.3"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a"
checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e"
dependencies = [
"dlv-list",
"hashbrown 0.12.3",
"hashbrown 0.13.2",
]
[[package]]
@ -4209,17 +4275,6 @@ dependencies = [
"windows-targets 0.52.5",
]
[[package]]
name = "password-hash"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700"
dependencies = [
"base64ct",
"rand_core",
"subtle",
]
[[package]]
name = "password-hash"
version = "0.5.0"
@ -4237,18 +4292,6 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "pbkdf2"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83a0692ec44e4cf1ef28ca317f14f8f07da2d95ec3fa01f86e4467b725e60917"
dependencies = [
"digest 0.10.7",
"hmac 0.12.1",
"password-hash 0.4.2",
"sha2 0.10.8",
]
[[package]]
name = "pbkdf2"
version = "0.12.2"
@ -4257,7 +4300,7 @@ checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2"
dependencies = [
"digest 0.10.7",
"hmac 0.12.1",
"password-hash 0.5.0",
"password-hash",
"sha2 0.10.8",
]
@ -4416,12 +4459,6 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
name = "platforms"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7"
[[package]]
name = "polyval"
version = "0.6.2"
@ -4666,9 +4703,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-xml"
version = "0.26.0"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956"
dependencies = [
"memchr",
"serde",
@ -4683,6 +4720,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "quick-xml"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2"
dependencies = [
"memchr",
]
[[package]]
name = "quinn"
version = "0.11.2"
@ -5273,9 +5319,9 @@ dependencies = [
[[package]]
name = "rust-ini"
version = "0.18.0"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df"
checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091"
dependencies = [
"cfg-if",
"ordered-multimap",
@ -5283,32 +5329,36 @@ dependencies = [
[[package]]
name = "rust-s3"
version = "0.33.0"
version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b2ac5ff6acfbe74226fa701b5ef793aaa054055c13ebb7060ad36942956e027"
checksum = "c6679da8efaf4c6f0c161de0961dfe95fb6e9049c398d6fbdada2639f053aedb"
dependencies = [
"async-trait",
"aws-creds",
"aws-region",
"base64 0.13.1",
"base64 0.21.7",
"bytes",
"cfg-if",
"futures",
"hex",
"hmac 0.12.1",
"http 0.2.12",
"hyper 0.14.29",
"hyper-tls",
"log",
"maybe-async",
"md5",
"native-tls",
"percent-encoding",
"quick-xml 0.26.0",
"reqwest 0.11.27",
"quick-xml 0.30.0",
"serde",
"serde_derive",
"serde_json",
"sha2 0.10.8",
"thiserror",
"time",
"tokio",
"tokio-native-tls",
"tokio-stream",
"url",
]
@ -5391,18 +5441,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustls"
version = "0.20.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99"
dependencies = [
"log",
"ring 0.16.20",
"sct",
"webpki",
]
[[package]]
name = "rustls"
version = "0.21.12"
@ -5589,8 +5627,8 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f"
dependencies = [
"password-hash 0.5.0",
"pbkdf2 0.12.2",
"password-hash",
"pbkdf2",
"salsa20",
"sha2 0.10.8",
]
@ -5997,8 +6035,8 @@ dependencies = [
"hyper 1.3.1",
"hyper-util",
"idna 0.5.0",
"imagesize",
"infer",
"imagesize 0.12.0",
"infer 0.15.0",
"lazy_static",
"lru-cache",
"mail-auth",
@ -6145,7 +6183,7 @@ dependencies = [
"bitpacking",
"blake3",
"bytes",
"deadpool",
"deadpool 0.12.1",
"deadpool-postgres",
"elasticsearch",
"farmhash",
@ -6329,6 +6367,18 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tempfile"
version = "3.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1"
dependencies = [
"cfg-if",
"fastrand",
"rustix",
"windows-sys 0.52.0",
]
[[package]]
name = "term"
version = "0.7.0"
@ -6383,6 +6433,7 @@ dependencies = [
"pop3",
"rayon",
"reqwest 0.12.5",
"ring 0.17.8",
"rustls 0.22.4",
"rustls-pemfile 2.1.2",
"rustls-pki-types",
@ -6545,6 +6596,16 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.10"
@ -6983,12 +7044,12 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
version = "2.5.1"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c25da092f0a868cdf09e8674cd3b7ef3a7d92a24253e663a2fb85e2496de56"
checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c"
dependencies = [
"form_urlencoded",
"idna 1.0.0",
"idna 0.5.0",
"percent-encoding",
]
@ -7232,15 +7293,6 @@ dependencies = [
"untrusted 0.9.0",
]
[[package]]
name = "webpki-roots"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki",
]
[[package]]
name = "webpki-roots"
version = "0.25.4"
@ -7707,26 +7759,6 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "zip"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261"
dependencies = [
"aes",
"byteorder",
"bzip2",
"constant_time_eq 0.1.5",
"crc32fast",
"crossbeam-utils",
"flate2",
"hmac 0.12.1",
"pbkdf2 0.11.0",
"sha1",
"time",
"zstd 0.11.2+zstd.1.5.2",
]
[[package]]
name = "zip"
version = "2.1.3"
@ -7736,7 +7768,7 @@ dependencies = [
"aes",
"arbitrary",
"bzip2",
"constant_time_eq 0.3.0",
"constant_time_eq",
"crc32fast",
"crossbeam-utils",
"deflate64",
@ -7746,14 +7778,14 @@ dependencies = [
"indexmap 2.2.6",
"lzma-rs",
"memchr",
"pbkdf2 0.12.2",
"pbkdf2",
"rand",
"sha1",
"thiserror",
"time",
"zeroize",
"zopfli",
"zstd 0.13.1",
"zstd",
]
[[package]]
@ -7770,32 +7802,13 @@ dependencies = [
"simd-adler32",
]
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4"
dependencies = [
"zstd-safe 5.0.2+zstd.1.5.2",
]
[[package]]
name = "zstd"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a"
dependencies = [
"zstd-safe 7.1.0",
]
[[package]]
name = "zstd-safe"
version = "5.0.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db"
dependencies = [
"libc",
"zstd-sys",
"zstd-safe",
]
[[package]]

View file

@ -45,19 +45,20 @@ opentelemetry = { version = "0.22.0" }
opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.15.0", features = ["http-proto", "reqwest-client"] }
opentelemetry-semantic-conventions = { version = "0.14.0" }
imagesize = "0.12"
imagesize = "0.13"
sha1 = "0.10"
sha2 = "0.10.6"
md5 = "0.7.0"
whatlang = "0.16"
idna = "0.5"
idna = "1.0"
decancer = "3.0.1"
unicode-security = "0.1.0"
infer = "0.15.0"
infer = "0.16"
bincode = "1.3.1"
hostname = "0.4.0"
zip = "0.6.6"
zip = "2.1"
pwhash = "1.0.0"
xxhash-rust = { version = "0.8.5", features = ["xxh3"] }
[target.'cfg(unix)'.dependencies]
privdrop = "0.5.3"

View file

@ -1,3 +1,26 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::time::Duration;
use utils::config::{Config, Rate};

View file

@ -1,3 +1,26 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use arc_swap::ArcSwap;
@ -5,7 +28,10 @@ use directory::{Directories, Directory};
use store::{BlobBackend, BlobStore, FtsStore, LookupStore, Store, Stores};
use utils::config::Config;
use crate::{expr::*, listener::tls::TlsManager, manager::config::ConfigManager, Core, Network};
use crate::{
expr::*, listener::tls::TlsManager, manager::config::ConfigManager, webhooks::Webhooks, Core,
Network,
};
use self::{
imap::ImapConfig, jmap::settings::JmapConfig, scripts::Scripting, smtp::SmtpConfig,
@ -130,6 +156,7 @@ impl Core {
jmap: JmapConfig::parse(config),
imap: ImapConfig::parse(config),
tls: TlsManager::parse(config),
web_hooks: Webhooks::parse(config),
storage: Storage {
data,
blob,

View file

@ -1,10 +1,41 @@
use utils::config::Config;
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{str::FromStr, time::Duration};
use crate::{
expr::{if_block::IfBlock, tokenizer::TokenMap},
listener::blocked::{AllowedIps, BlockedIps},
webhooks::{Webhook, WebhookType, Webhooks},
Network,
};
use ahash::AHashSet;
use base64::{engine::general_purpose::STANDARD, Engine};
use hyper::{
header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
HeaderMap,
};
use utils::config::Config;
use super::CONNECTION_VARS;
@ -40,3 +71,132 @@ impl Network {
network
}
}
impl Webhooks {
pub fn parse(config: &mut Config) -> Self {
let mut hooks = Webhooks {
events: Default::default(),
hooks: Default::default(),
};
for id in config
.sub_keys("webhook", ".url")
.map(|s| s.to_string())
.collect::<Vec<_>>()
{
if let Some(webhook) = parse_webhook(config, &id) {
hooks.events.extend(&webhook.events);
hooks.hooks.insert(webhook.id, webhook.into());
}
}
hooks
}
}
fn parse_webhook(config: &mut Config, id: &str) -> Option<Webhook> {
let mut headers = HeaderMap::new();
for (header, value) in config
.values(("webhook", id, "headers"))
.map(|(_, v)| {
if let Some((k, v)) = v.split_once(':') {
Ok((
HeaderName::from_str(k.trim()).map_err(|err| {
format!("Invalid header found in property \"webhook.{id}.headers\": {err}",)
})?,
HeaderValue::from_str(v.trim()).map_err(|err| {
format!("Invalid header found in property \"webhook.{id}.headers\": {err}",)
})?,
))
} else {
Err(format!(
"Invalid header found in property \"webhook.{id}.headers\": {v}",
))
}
})
.collect::<Result<Vec<(HeaderName, HeaderValue)>, String>>()
.map_err(|e| config.new_parse_error(("webhook", id, "headers"), e))
.unwrap_or_default()
{
headers.insert(header, value);
}
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let (Some(name), Some(secret)) = (
config.value(("webhook", id, "user")),
config.value(("webhook", id, "secret")),
) {
headers.insert(
AUTHORIZATION,
format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret)))
.parse()
.unwrap(),
);
}
// Parse webhook events
let mut events = AHashSet::new();
let mut parse_errors = Vec::new();
for (_, value) in config.values(("webhook", id, "events")) {
match WebhookType::from_str(value) {
Ok(event) => {
events.insert(event);
}
Err(err) => {
parse_errors.push(err);
}
}
}
if !parse_errors.is_empty() {
config.new_parse_error(
("webhook", id, "events"),
format!("Invalid webhook events: {}", parse_errors.join(", ")),
);
}
let url = config.value_require(("webhook", id, "url"))?.to_string();
Some(Webhook {
id: xxhash_rust::xxh3::xxh3_64(url.as_bytes()),
url,
timeout: config
.property_or_default(("webhook", id, "timeout"), "30s")
.unwrap_or_else(|| Duration::from_secs(30)),
tls_allow_invalid_certs: config
.property_or_default(("webhook", id, "allow-invalid-certs"), "false")
.unwrap_or_default(),
headers,
key: config
.value(("webhook", id, "signature-key"))
.unwrap_or_default()
.to_string(),
throttle: config
.property_or_default(("webhook", id, "throttle"), "1s")
.unwrap_or_else(|| Duration::from_secs(1)),
events,
})
}
impl FromStr for WebhookType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"auth.success" => Ok(Self::AuthSuccess),
"auth.failure" => Ok(Self::AuthFailure),
"auth.banned" => Ok(Self::AuthBanned),
"auth.error" => Ok(Self::AuthError),
"message.accepted" => Ok(Self::MessageAccepted),
"message.rejected" => Ok(Self::MessageRejected),
"message.appended" => Ok(Self::MessageAppended),
"account.over-quota" => Ok(Self::AccountOverQuota),
"dsn" => Ok(Self::DSN),
"double-bounce" => Ok(Self::DoubleBounce),
"report.incoming.dmarc" => Ok(Self::IncomingDmarcReport),
"report.incoming.tls" => Ok(Self::IncomingTlsReport),
"report.incoming.arf" => Ok(Self::IncomingArfReport),
"report.outgoing" => Ok(Self::OutgoingReport),
_ => Err(s.to_string()),
}
}
}

View file

@ -1,3 +1,26 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{
collections::HashSet,
sync::Arc,
@ -24,6 +47,9 @@ pub struct Scripting {
pub return_path: IfBlock,
pub sign: IfBlock,
pub scripts: AHashMap<String, Arc<Sieve>>,
}
pub struct ScriptCache {
pub bayes_cache: BayesTokenCache,
pub remote_lists: RwLock<AHashMap<String, RemoteList>>,
}
@ -307,29 +333,32 @@ impl Scripting {
"'MAILER-DAEMON@' + key_get('default', 'domain')",
)
}),
from_name: IfBlock::try_parse(config, "sieve.trusted.from-name", &token_map)
from_name: IfBlock::try_parse(config, "sieve.trusted.from-name", &token_map)
.unwrap_or_else(|| {
IfBlock::new::<()>(
"sieve.trusted.from-name",
[],
"'Automated Message'",
)
IfBlock::new::<()>("sieve.trusted.from-name", [], "'Automated Message'")
}),
return_path: IfBlock::try_parse(config, "sieve.trusted.return-path", &token_map)
.unwrap_or_else(|| {
IfBlock::empty(
"sieve.trusted.return-path",
)
}),
sign: IfBlock::try_parse(config, "sieve.trusted.sign", &token_map)
.unwrap_or_else(|| {
return_path: IfBlock::try_parse(config, "sieve.trusted.return-path", &token_map)
.unwrap_or_else(|| IfBlock::empty("sieve.trusted.return-path")),
sign: IfBlock::try_parse(config, "sieve.trusted.sign", &token_map).unwrap_or_else(
|| {
IfBlock::new::<()>(
"sieve.trusted.sign",
[],
"['rsa-' + key_get('default', 'domain'), 'ed25519-' + key_get('default', 'domain')]",
concat!(
"['rsa-' + key_get('default', 'domain'), ",
"'ed25519-' + key_get('default', 'domain')]"
),
)
}),
},
),
scripts,
}
}
}
impl ScriptCache {
pub fn parse(config: &mut Config) -> Self {
ScriptCache {
bayes_cache: BayesTokenCache::new(
config
.property_or_default("cache.bayes.capacity", "8192")
@ -362,9 +391,19 @@ impl Default for Scripting {
sign: IfBlock::new::<()>(
"sieve.trusted.sign",
[],
"['rsa-' + key_get('default', 'domain'), 'ed25519-' + key_get('default', 'domain')]",
concat!(
"['rsa-' + key_get('default', 'domain'), ",
"'ed25519-' + key_get('default', 'domain')]"
),
),
scripts: AHashMap::new(),
}
}
}
impl Default for ScriptCache {
fn default() -> Self {
Self {
bayes_cache: BayesTokenCache::new(
8192,
Duration::from_secs(3600),
@ -386,8 +425,6 @@ impl Clone for Scripting {
return_path: self.return_path.clone(),
sign: self.sign.clone(),
scripts: self.scripts.clone(),
bayes_cache: self.bayes_cache.clone(),
remote_lists: RwLock::new(self.remote_lists.read().clone()),
}
}
}

View file

@ -1,6 +1,7 @@
use std::{fmt::Display, net::SocketAddr, time::Duration};
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
use tokio::net::TcpSocket;
use utils::config::ipmask::IpAddrMask;
@ -36,7 +37,7 @@ pub struct Listener {
pub nodelay: bool,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)]
pub enum ServerProtocol {
#[default]
Smtp,

View file

@ -6,7 +6,10 @@ use std::{
use ahash::AHashSet;
use base64::{engine::general_purpose::STANDARD, Engine};
use hyper::{header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, HeaderMap};
use hyper::{
header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
HeaderMap,
};
use smtp_proto::*;
use utils::config::{utils::ParseValue, Config};
@ -203,14 +206,14 @@ impl SessionConfig {
session.rcpt.catch_all = AddressMapping::parse(config, "session.rcpt.catch-all");
session.rcpt.subaddressing = AddressMapping::parse(config, "session.rcpt.sub-addressing");
session.milters = config
.sub_keys("session.milter", "")
.sub_keys("session.milter", ".hostname")
.map(|s| s.to_string())
.collect::<Vec<_>>()
.into_iter()
.filter_map(|id| parse_milter(config, &id, &has_rcpt_vars))
.collect();
session.jmilters = config
.sub_keys("session.jmilter", "")
.sub_keys("session.jmilter", ".url")
.map(|s| s.to_string())
.collect::<Vec<_>>()
.into_iter()
@ -589,20 +592,17 @@ fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<
HeaderName::from_str(k.trim()).map_err(|err| {
format!(
"Invalid header found in property \"session.jmilter.{id}.headers\": {err}",
)
})?,
HeaderValue::from_str(v.trim()).map_err(|err| {
format!(
"Invalid header found in property \"session.jmilter.{id}.headers\": {err}",
)
})?,
))
} else {
Err(format!(
"Invalid header found in property \"session.jmilter.{id}.headers\": {v}",
))
}
})
@ -614,8 +614,16 @@ fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<
}
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let (Some(name), Some(secret)) = (config.value(("session.jmilter", id, "user")), config.value(("session.jmilter", id, "secret"))) {
headers.insert(AUTHORIZATION, format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret))).parse().unwrap());
if let (Some(name), Some(secret)) = (
config.value(("session.jmilter", id, "user")),
config.value(("session.jmilter", id, "secret")),
) {
headers.insert(
AUTHORIZATION,
format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret)))
.parse()
.unwrap(),
);
}
Some(JMilter {

View file

@ -1,3 +1,26 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use ahash::AHashMap;

View file

@ -1,3 +1,26 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{collections::HashMap, str::FromStr};
use opentelemetry_otlp::{HttpExporterBuilder, TonicExporterBuilder, WithExportConfig};

View file

@ -28,6 +28,7 @@ use config::{
imap::ImapConfig,
jmap::settings::JmapConfig,
scripts::Scripting,
server::ServerProtocol,
smtp::{
auth::{ArcSealer, DkimSigner},
queue::RelayHost,
@ -36,7 +37,7 @@ use config::{
storage::Storage,
tracers::{OtelTracer, Tracer, Tracers},
};
use directory::{core::secret::verify_secret_hash, Directory, Principal, QueryBy};
use directory::{core::secret::verify_secret_hash, Directory, Principal, QueryBy, Type};
use expr::if_block::IfBlock;
use listener::{
blocked::{AllowedIps, BlockedIps},
@ -51,12 +52,13 @@ use opentelemetry_sdk::{
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
use sieve::Sieve;
use store::LookupStore;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{
layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry,
};
use utils::{config::Config, BlobHash};
use webhooks::{manager::WebhookEvent, WebhookPayload, WebhookType, Webhooks};
pub mod addresses;
pub mod config;
@ -64,10 +66,13 @@ pub mod expr;
pub mod listener;
pub mod manager;
pub mod scripts;
pub mod webhooks;
pub static USER_AGENT: &str = concat!("StalwartMail/", env!("CARGO_PKG_VERSION"),);
pub static DAEMON_NAME: &str = concat!("Stalwart Mail Server v", env!("CARGO_PKG_VERSION"),);
pub const IPC_CHANNEL_BUFFER: usize = 1024;
pub type SharedCore = Arc<ArcSwap<Core>>;
#[derive(Clone, Default)]
@ -79,6 +84,7 @@ pub struct Core {
pub smtp: SmtpConfig,
pub jmap: JmapConfig,
pub imap: ImapConfig,
pub web_hooks: Webhooks,
}
#[derive(Clone)]
@ -103,6 +109,11 @@ pub enum DeliveryEvent {
Stop,
}
pub struct Ipc {
pub delivery_tx: mpsc::Sender<DeliveryEvent>,
pub webhook_tx: mpsc::Sender<WebhookEvent>,
}
#[derive(Debug)]
pub struct IngestMessage {
pub sender_address: String,
@ -230,8 +241,10 @@ impl Core {
pub async fn authenticate(
&self,
directory: &Directory,
ipc: &Ipc,
credentials: &Credentials<String>,
remote_ip: IpAddr,
protocol: ServerProtocol,
return_member_of: bool,
) -> directory::Result<AuthResult<Principal<u32>>> {
// First try to authenticate the user against the default directory
@ -239,7 +252,24 @@ impl Core {
.query(QueryBy::Credentials(credentials), return_member_of)
.await
{
Ok(Some(principal)) => return Ok(AuthResult::Success(principal)),
Ok(Some(principal)) => {
// Send webhook event
if self.has_webhook_subscribers(WebhookType::AuthSuccess) {
ipc.send_webhook(
WebhookType::AuthSuccess,
WebhookPayload::Authentication {
login: credentials.login().to_string(),
protocol,
remote_ip,
typ: principal.typ.into(),
as_master: None,
},
)
.await;
}
return Ok(AuthResult::Success(principal));
}
Ok(None) => Ok(()),
Err(err) => Err(err),
};
@ -254,6 +284,20 @@ impl Core {
if username == fallback_admin =>
{
if verify_secret_hash(fallback_pass, secret).await {
// Send webhook event
if self.has_webhook_subscribers(WebhookType::AuthSuccess) {
ipc.send_webhook(
WebhookType::AuthSuccess,
WebhookPayload::Authentication {
login: username.to_string(),
protocol,
remote_ip,
typ: Type::Superuser.into(),
as_master: None,
},
)
.await;
}
return Ok(AuthResult::Success(Principal::fallback_admin(
fallback_pass,
)));
@ -270,8 +314,37 @@ impl Core {
.query(QueryBy::Name(username), return_member_of)
.await?
{
// Send webhook event
if self.has_webhook_subscribers(WebhookType::AuthSuccess) {
ipc.send_webhook(
WebhookType::AuthSuccess,
WebhookPayload::Authentication {
login: username.to_string(),
protocol,
remote_ip,
typ: principal.typ.into(),
as_master: true.into(),
},
)
.await;
}
AuthResult::Success(principal)
} else {
// Send webhook event
if self.has_webhook_subscribers(WebhookType::AuthFailure) {
ipc.send_webhook(
WebhookType::AuthFailure,
WebhookPayload::Authentication {
login: username.to_string(),
protocol,
remote_ip,
typ: None,
as_master: true.into(),
},
)
.await;
}
AuthResult::Failure
},
);
@ -281,13 +354,20 @@ impl Core {
}
if let Err(err) = result {
// Send webhook event
if self.has_webhook_subscribers(WebhookType::AuthError) {
ipc.send_webhook(
WebhookType::AuthError,
WebhookPayload::Error {
message: err.to_string(),
},
)
.await;
}
Err(err)
} else if self.has_fail2ban() {
let login = match credentials {
Credentials::Plain { username, .. }
| Credentials::XOauth2 { username, .. }
| Credentials::OAuthBearer { token: username } => username,
};
let login = credentials.login();
if self.is_fail2banned(remote_ip, login.to_string()).await? {
tracing::info!(
context = "directory",
@ -297,11 +377,55 @@ impl Core {
"IP address blocked after too many failed login attempts",
);
// Send webhook event
if self.has_webhook_subscribers(WebhookType::AuthBanned) {
ipc.send_webhook(
WebhookType::AuthBanned,
WebhookPayload::Authentication {
login: credentials.login().to_string(),
protocol,
remote_ip,
typ: None,
as_master: None,
},
)
.await;
}
Ok(AuthResult::Banned)
} else {
// Send webhook event
if self.has_webhook_subscribers(WebhookType::AuthFailure) {
ipc.send_webhook(
WebhookType::AuthFailure,
WebhookPayload::Authentication {
login: credentials.login().to_string(),
protocol,
remote_ip,
typ: None,
as_master: None,
},
)
.await;
}
Ok(AuthResult::Failure)
}
} else {
// Send webhook event
if self.has_webhook_subscribers(WebhookType::AuthFailure) {
ipc.send_webhook(
WebhookType::AuthFailure,
WebhookPayload::Authentication {
login: credentials.login().to_string(),
protocol,
remote_ip,
typ: None,
as_master: None,
},
)
.await;
}
Ok(AuthResult::Failure)
}
}
@ -421,3 +545,17 @@ impl Tracers {
}
}
}
trait CredentialsUsername {
fn login(&self) -> &str;
}
impl CredentialsUsername for Credentials<String> {
fn login(&self) -> &str {
match self {
Credentials::Plain { username, .. }
| Credentials::XOauth2 { username, .. }
| Credentials::OAuthBearer { token: username } => username,
}
}
}

View file

@ -21,11 +21,7 @@
* for more details.
*/
use std::{
fmt::Debug,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::atomic::AtomicU8,
};
use std::{fmt::Debug, net::IpAddr, sync::atomic::AtomicU8};
use ahash::AHashSet;
use parking_lot::RwLock;
@ -113,9 +109,12 @@ impl AllowedIps {
}
}
// Add loopback addresses
ip_addresses.insert(IpAddr::V4(Ipv4Addr::LOCALHOST));
ip_addresses.insert(IpAddr::V6(Ipv6Addr::LOCALHOST));
#[cfg(not(feature = "test_mode"))]
{
// Add loopback addresses
ip_addresses.insert(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
ip_addresses.insert(IpAddr::V6(std::net::IIpv6Addr::LOCALHOST));
}
AllowedIps {
ip_addresses,
@ -210,14 +209,18 @@ impl Default for BlockedIps {
}
}
#[allow(clippy::derivable_impls)]
impl Default for AllowedIps {
fn default() -> Self {
// Add IPv4 and IPv6 loopback addresses
Self {
#[cfg(not(feature = "test_mode"))]
ip_addresses: AHashSet::from_iter([
IpAddr::V4(Ipv4Addr::LOCALHOST),
IpAddr::V6(Ipv6Addr::LOCALHOST),
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
IpAddr::V6(std::net::Ipv6Addr::LOCALHOST),
]),
#[cfg(feature = "test_mode")]
ip_addresses: Default::default(),
ip_networks: Default::default(),
has_networks: Default::default(),
}

View file

@ -23,7 +23,6 @@
use ahash::AHashSet;
use arc_swap::ArcSwap;
use parking_lot::RwLock;
use store::Stores;
use utils::config::{ipmask::IpAddrOrMask, utils::ParseValue, Config};
@ -133,9 +132,6 @@ impl Core {
if !config.errors.is_empty() {
return Ok(config.into());
}
// Transfer Sieve cache
core.sieve.bayes_cache = self.sieve.bayes_cache.clone();
core.sieve.remote_lists = RwLock::new(self.sieve.remote_lists.read().clone());
// Copy ACME certificates
let mut certificates = core.tls.certificates.load().as_ref().clone();

View file

@ -33,14 +33,13 @@ pub fn fn_img_metadata<'x>(ctx: &'x Context<'x>, v: Vec<Variable>) -> Variable {
"type" => imagesize::image_type(bytes).ok().map(|t| {
Variable::from(match t {
imagesize::ImageType::Aseprite => "aseprite",
imagesize::ImageType::Avif => "avif",
imagesize::ImageType::Bmp => "bmp",
imagesize::ImageType::Dds => "dds",
imagesize::ImageType::Exr => "exr",
imagesize::ImageType::Farbfeld => "farbfeld",
imagesize::ImageType::Gif => "gif",
imagesize::ImageType::Hdr => "hdr",
imagesize::ImageType::Heif => "heif",
imagesize::ImageType::Heif(_) => "heif",
imagesize::ImageType::Ico => "ico",
imagesize::ImageType::Jpeg => "jpeg",
imagesize::ImageType::Jxl => "jxl",
@ -53,6 +52,8 @@ pub fn fn_img_metadata<'x>(ctx: &'x Context<'x>, v: Vec<Variable>) -> Variable {
imagesize::ImageType::Tiff => "tiff",
imagesize::ImageType::Vtf => "vtf",
imagesize::ImageType::Webp => "webp",
imagesize::ImageType::Ilbm => "ilbm",
_ => "unknown",
})
}),
"width" => imagesize::blob_size(bytes)

View file

@ -116,7 +116,7 @@ async fn train(ctx: PluginContext<'_>, is_train: bool) -> Variable {
);
// Update weight and invalidate cache
let bayes_cache = &ctx.core.sieve.bayes_cache;
let bayes_cache = &ctx.cache.bayes_cache;
if is_train {
for (hash, weights) in model.weights {
if store
@ -209,7 +209,7 @@ pub async fn exec_classify(ctx: PluginContext<'_>) -> Variable {
}
// Obtain training counts
let bayes_cache = &ctx.core.sieve.bayes_cache;
let bayes_cache = &ctx.cache.bayes_cache;
let (spam_learns, ham_learns) =
if let Some(weights) = bayes_cache.get_or_update(TokenHash::default(), store).await {
(weights.spam, weights.ham)
@ -286,7 +286,7 @@ pub async fn exec_is_balanced(ctx: PluginContext<'_>) -> Variable {
let learn_spam = ctx.arguments[1].to_bool();
// Obtain training counts
let bayes_cache = &ctx.core.sieve.bayes_cache;
let bayes_cache = &ctx.cache.bayes_cache;
let (spam_learns, ham_learns) =
if let Some(weights) = bayes_cache.get_or_update(TokenHash::default(), store).await {
(weights.spam as f64, weights.ham as f64)

View file

@ -180,7 +180,7 @@ pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable {
const MAX_ENTRY_SIZE: usize = 256;
const MAX_ENTRIES: usize = 100000;
match ctx.core.sieve.remote_lists.read().get(resource.as_ref()) {
match ctx.cache.remote_lists.read().get(resource.as_ref()) {
Some(remote_list) if remote_list.expires < Instant::now() => {
return remote_list.entries.contains(item.as_ref()).into()
}
@ -245,7 +245,7 @@ pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable {
};
// Lock remote list for writing
let mut _lock = ctx.core.sieve.remote_lists.write();
let mut _lock = ctx.cache.remote_lists.write();
let list = _lock
.entry(resource.to_string())
.or_insert_with(|| RemoteList {
@ -369,7 +369,7 @@ pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable {
}
// Something went wrong, try again in one hour
let mut _lock = ctx.core.sieve.remote_lists.write();
let mut _lock = ctx.cache.remote_lists.write();
let list = _lock
.entry(resource.to_string())
.or_insert_with(|| RemoteList {

View file

@ -34,7 +34,7 @@ pub mod text;
use mail_parser::Message;
use sieve::{runtime::Variable, FunctionMap, Input};
use crate::Core;
use crate::{config::scripts::ScriptCache, Core};
use super::ScriptModification;
@ -43,6 +43,7 @@ type RegisterPluginFnc = fn(u32, &mut FunctionMap) -> ();
pub struct PluginContext<'x> {
pub span: &'x tracing::Span,
pub core: &'x Core,
pub cache: &'x ScriptCache,
pub message: &'x Message<'x>,
pub modifications: &'x mut Vec<ScriptModification>,
pub arguments: Vec<Variable>,

View file

@ -0,0 +1,50 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use crate::{Core, Ipc};
use super::{manager::WebhookEvent, WebhookPayload, WebhookType};
impl Core {
#[inline(always)]
pub fn has_webhook_subscribers(&self, event_type: WebhookType) -> bool {
self.web_hooks.events.contains(&event_type)
}
}
impl Ipc {
pub async fn send_webhook(&self, event_type: WebhookType, payload: WebhookPayload) {
if let Err(err) = self
.webhook_tx
.send(WebhookEvent::Send {
typ: event_type,
payload: Arc::new(payload),
})
.await
{
tracing::warn!("Failed to send webhook event: {:?}", err);
}
}
}

View file

@ -0,0 +1,268 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{
sync::Arc,
time::{Duration, Instant},
};
use crate::{SharedCore, IPC_CHANNEL_BUFFER};
use ahash::AHashMap;
use base64::{engine::general_purpose::STANDARD, Engine};
use chrono::Utc;
use ring::hmac;
use tokio::sync::mpsc;
use utils::snowflake::SnowflakeIdGenerator;
use super::{Webhook, WebhookEvents, WebhookPayload, WebhookType};
pub enum WebhookEvent {
Send {
typ: WebhookType,
payload: Arc<WebhookPayload>,
},
Success {
webhook_id: u64,
},
Retry {
webhook_id: u64,
events: WebhookEvents,
},
Stop,
}
pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365);
struct PendingEvents {
next_delivery: Instant,
pending_events: WebhookEvents,
retry_num: u32,
in_flight: bool,
}
pub fn spawn_webhook_manager(core: SharedCore) -> mpsc::Sender<WebhookEvent> {
let (webhook_tx, mut webhook_rx) = mpsc::channel(IPC_CHANNEL_BUFFER);
let webhook_tx_ = webhook_tx.clone();
tokio::spawn(async move {
let mut wakeup_time = LONG_SLUMBER;
let mut pending_events: AHashMap<u64, PendingEvents> = AHashMap::new();
let id_generator = SnowflakeIdGenerator::new();
loop {
// Wait for the next event or timeout
let event_or_timeout = tokio::time::timeout(wakeup_time, webhook_rx.recv()).await;
// Load settings
let core = core.load();
match event_or_timeout {
Ok(Some(event)) => match event {
WebhookEvent::Send { typ, payload } => {
for (webhook_id, webhook) in &core.web_hooks.hooks {
if webhook.events.contains(&typ) {
pending_events.entry(*webhook_id).or_default().push(
super::WebhookEvent {
id: id_generator.generate().unwrap_or_default(),
created_at: Utc::now(),
typ,
data: payload.clone(),
},
);
}
}
}
WebhookEvent::Success { webhook_id } => {
if let Some(pending_events) = pending_events.get_mut(&webhook_id) {
pending_events.success();
}
}
WebhookEvent::Retry { webhook_id, events } => {
pending_events.entry(webhook_id).or_default().retry(events);
}
WebhookEvent::Stop => break,
},
Ok(None) => {
break;
}
Err(_) => (),
}
// Process events
let mut delete_ids = Vec::new();
let mut next_retry = None;
for (webhook_id, events) in &mut pending_events {
if let Some(webhook) = core.web_hooks.hooks.get(webhook_id) {
if events.next_delivery <= Instant::now() {
if !events.is_empty() {
events.next_delivery = Instant::now() + webhook.throttle;
if !events.in_flight {
events.in_flight = true;
spawn_webhook_handler(
webhook.clone(),
events.take_events(),
webhook_tx.clone(),
);
}
} else {
// No more events for webhook
delete_ids.push(*webhook_id);
}
} else if !events.is_empty() {
// Retry later
let this_retry = events.next_delivery - Instant::now();
match next_retry {
Some(next_retry) if this_retry >= next_retry => {}
_ => {
next_retry = Some(this_retry);
}
}
}
} else {
delete_ids.push(*webhook_id);
}
}
wakeup_time = next_retry.unwrap_or(LONG_SLUMBER);
// Delete removed or empty webhooks
for webhook_id in delete_ids {
pending_events.remove(&webhook_id);
}
}
});
webhook_tx_
}
fn spawn_webhook_handler(
webhook: Arc<Webhook>,
events: WebhookEvents,
webhook_tx: mpsc::Sender<WebhookEvent>,
) {
tokio::spawn(async move {
let response = match post_webhook_events(&webhook, &events).await {
Ok(_) => WebhookEvent::Success {
webhook_id: webhook.id,
},
Err(err) => {
tracing::warn!("Failed to post webhook events: {}", err);
WebhookEvent::Retry {
webhook_id: webhook.id,
events,
}
}
};
// Notify manager
if let Err(err) = webhook_tx.send(response).await {
tracing::error!("Failed to send webhook event: {}", err);
}
});
}
async fn post_webhook_events(webhook: &Webhook, events: &WebhookEvents) -> Result<(), String> {
// Serialize body
let body = serde_json::to_string(events)
.map_err(|err| format!("Failed to serialize events: {}", err))?;
// Add HMAC-SHA256 signature
let mut headers = webhook.headers.clone();
if !webhook.key.is_empty() {
let key = hmac::Key::new(hmac::HMAC_SHA256, webhook.key.as_bytes());
let tag = hmac::sign(&key, body.as_bytes());
headers.insert(
"X-Signature",
STANDARD.encode(tag.as_ref()).parse().unwrap(),
);
}
// Send request
let response = reqwest::Client::builder()
.timeout(webhook.timeout)
.danger_accept_invalid_certs(webhook.tls_allow_invalid_certs)
.build()
.map_err(|err| format!("Failed to create HTTP client: {}", err))?
.post(&webhook.url)
.headers(headers)
.body(body)
.send()
.await
.map_err(|err| format!("Webhook request to {} failed: {err}", webhook.url))?;
if response.status().is_success() {
Ok(())
} else {
Err(format!(
"Webhook request to {} failed with code {}: {}",
webhook.url,
response.status().as_u16(),
response.status().canonical_reason().unwrap_or("Unknown")
))
}
}
impl Default for PendingEvents {
fn default() -> Self {
Self {
next_delivery: Instant::now(),
pending_events: WebhookEvents::default(),
retry_num: 0,
in_flight: false,
}
}
}
impl PendingEvents {
pub fn push(&mut self, event: super::WebhookEvent) {
self.pending_events.events.push(event);
}
pub fn success(&mut self) {
self.in_flight = false;
self.retry_num = 0;
}
pub fn retry(&mut self, events: WebhookEvents) {
// Backoff
self.next_delivery = Instant::now() + Duration::from_secs(2u64.pow(self.retry_num));
self.retry_num += 1;
self.in_flight = false;
for event in events.events {
// Drop failed events older than 5 minutes
if event.created_at + Duration::from_secs(5 * 60) >= Utc::now() {
self.pending_events.events.push(event);
}
}
}
pub fn is_empty(&self) -> bool {
self.pending_events.events.is_empty()
}
pub fn take_events(&mut self) -> WebhookEvents {
std::mem::take(&mut self.pending_events)
}
}

View file

@ -0,0 +1,351 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{net::IpAddr, sync::Arc, time::Duration};
use ahash::{AHashMap, AHashSet};
use chrono::{DateTime, Utc};
use hyper::HeaderMap;
use mail_auth::report::{
tlsrpt::{PolicyType, ResultType},
AuthFailureType, DeliveryResult, FeedbackType, IdentityAlignment,
};
use serde::{Deserialize, Serialize};
use crate::config::server::ServerProtocol;
pub mod collector;
pub mod manager;
#[derive(Clone, Default)]
pub struct Webhooks {
pub events: AHashSet<WebhookType>,
pub hooks: AHashMap<u64, Arc<Webhook>>,
}
#[derive(Clone)]
pub struct Webhook {
pub id: u64,
pub url: String,
pub key: String,
pub timeout: Duration,
pub throttle: Duration,
pub tls_allow_invalid_certs: bool,
pub headers: HeaderMap,
pub events: AHashSet<WebhookType>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct WebhookEvents {
pub events: Vec<WebhookEvent>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WebhookEvent {
pub id: u64,
#[serde(rename = "createdAt")]
pub created_at: DateTime<Utc>,
#[serde(rename = "type")]
pub typ: WebhookType,
pub data: Arc<WebhookPayload>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)]
pub enum WebhookType {
#[serde(rename = "auth.success")]
AuthSuccess,
#[serde(rename = "auth.failure")]
AuthFailure,
#[serde(rename = "auth.banned")]
AuthBanned,
#[serde(rename = "auth.error")]
AuthError,
#[serde(rename = "message.accepted")]
MessageAccepted,
#[serde(rename = "message.rejected")]
MessageRejected,
#[serde(rename = "message.appended")]
MessageAppended,
#[serde(rename = "account.over-quota")]
AccountOverQuota,
#[serde(rename = "dsn")]
DSN,
#[serde(rename = "double-bounce")]
DoubleBounce,
#[serde(rename = "report.incoming.dmarc")]
IncomingDmarcReport,
#[serde(rename = "report.incoming.tls")]
IncomingTlsReport,
#[serde(rename = "report.incoming.arf")]
IncomingArfReport,
#[serde(rename = "report.outgoing")]
OutgoingReport,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum WebhookPayload {
Authentication {
login: String,
protocol: ServerProtocol,
#[serde(rename = "remoteIp")]
remote_ip: IpAddr,
#[serde(rename = "accountType")]
#[serde(skip_serializing_if = "Option::is_none")]
typ: Option<directory::Type>,
#[serde(rename = "isMasterLogin")]
#[serde(skip_serializing_if = "Option::is_none")]
as_master: Option<bool>,
},
Error {
message: String,
},
MessageAccepted {
#[serde(rename = "queueId")]
id: u64,
#[serde(rename = "remoteIp")]
#[serde(skip_serializing_if = "Option::is_none")]
remote_ip: Option<IpAddr>,
#[serde(rename = "localPort")]
#[serde(skip_serializing_if = "Option::is_none")]
local_port: Option<u16>,
#[serde(rename = "authenticatedAs")]
#[serde(skip_serializing_if = "Option::is_none")]
authenticated_as: Option<String>,
#[serde(rename = "returnPath")]
return_path: String,
recipients: Vec<String>,
#[serde(rename = "nextRetry")]
next_retry: String,
#[serde(rename = "nextDSN")]
next_dsn: String,
expires: String,
size: usize,
},
MessageRejected {
reason: WebhookMessageFailure,
#[serde(rename = "remoteIp")]
remote_ip: IpAddr,
#[serde(rename = "localPort")]
local_port: u16,
#[serde(rename = "authenticatedAs")]
#[serde(skip_serializing_if = "Option::is_none")]
authenticated_as: Option<String>,
#[serde(rename = "returnPath")]
#[serde(skip_serializing_if = "Option::is_none")]
return_path: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
recipients: Vec<String>,
},
MessageAppended {
#[serde(rename = "accountId")]
account_id: u32,
#[serde(rename = "mailboxIds")]
mailbox_ids: Vec<u32>,
source: WebhookIngestSource,
encrypt: bool,
size: usize,
},
DSN {
#[serde(rename = "queueId")]
id: u64,
sender: String,
status: Vec<WebhookDSN>,
#[serde(rename = "createdAt")]
created: String,
},
IncomingDmarcReport {
#[serde(rename = "rangeFrom")]
range_from: String,
#[serde(rename = "rangeTo")]
range_to: String,
domain: String,
#[serde(rename = "reportEmail")]
report_email: String,
#[serde(rename = "reportId")]
report_id: String,
#[serde(rename = "dmarcPass")]
dmarc_pass: u32,
#[serde(rename = "dmarcQuarantine")]
dmarc_quarantine: u32,
#[serde(rename = "dmarcReject")]
dmarc_reject: u32,
#[serde(rename = "dmarcNone")]
dmarc_none: u32,
#[serde(rename = "dkimPass")]
dkim_pass: u32,
#[serde(rename = "dkimFail")]
dkim_fail: u32,
#[serde(rename = "dkimNone")]
dkim_none: u32,
#[serde(rename = "spfPass")]
spf_pass: u32,
#[serde(rename = "spfFail")]
spf_fail: u32,
#[serde(rename = "spfNone")]
spf_none: u32,
},
IncomingTlsReport {
policies: Vec<WebhookTlsPolicy>,
},
IncomingArfReport {
#[serde(rename = "feedbackType")]
feedback_type: FeedbackType,
#[serde(rename = "arrivalDate")]
#[serde(skip_serializing_if = "Option::is_none")]
arrival_date: Option<String>,
#[serde(rename = "authenticationResults")]
#[serde(skip_serializing_if = "Vec::is_empty")]
authentication_results: Vec<String>,
incidents: u32,
#[serde(rename = "reportedDomains")]
#[serde(skip_serializing_if = "Vec::is_empty")]
reported_domain: Vec<String>,
#[serde(rename = "reportedUris")]
#[serde(skip_serializing_if = "Vec::is_empty")]
reported_uri: Vec<String>,
#[serde(rename = "reportingMTA")]
#[serde(skip_serializing_if = "Option::is_none")]
reporting_mta: Option<String>,
#[serde(rename = "sourceIp")]
#[serde(skip_serializing_if = "Option::is_none")]
source_ip: Option<IpAddr>,
#[serde(rename = "userAgent")]
#[serde(skip_serializing_if = "Option::is_none")]
user_agent: Option<String>,
#[serde(rename = "authFailureType")]
#[serde(skip_serializing_if = "has_no_auth_failure")]
auth_failure: AuthFailureType,
#[serde(rename = "deliveryResult")]
#[serde(skip_serializing_if = "has_no_delivery_result")]
delivery_result: DeliveryResult,
#[serde(rename = "dkimDomain")]
#[serde(skip_serializing_if = "Option::is_none")]
dkim_domain: Option<String>,
#[serde(rename = "dkimIdentity")]
#[serde(skip_serializing_if = "Option::is_none")]
dkim_identity: Option<String>,
#[serde(rename = "dkimSelector")]
#[serde(skip_serializing_if = "Option::is_none")]
dkim_selector: Option<String>,
#[serde(rename = "identityAlignment")]
#[serde(skip_serializing_if = "has_no_alignment")]
identity_alignment: IdentityAlignment,
},
AccountOverQuota {
#[serde(rename = "accountId")]
account_id: u32,
#[serde(rename = "quotaLimit")]
quota_limit: usize,
#[serde(rename = "quotaUsed")]
quota_used: usize,
#[serde(rename = "objectSize")]
object_size: usize,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WebhookTlsPolicy {
#[serde(rename = "rangeFrom")]
pub range_from: String,
#[serde(rename = "rangeTo")]
pub range_to: String,
pub domain: String,
#[serde(rename = "reportContact")]
#[serde(skip_serializing_if = "Option::is_none")]
pub report_contact: Option<String>,
#[serde(rename = "reportId")]
pub report_id: String,
#[serde(rename = "policyType")]
pub policy_type: PolicyType,
#[serde(rename = "totalSuccesses")]
pub total_successes: u32,
#[serde(rename = "totalFailures")]
pub total_failures: u32,
pub details: AHashMap<ResultType, u32>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WebhookDSN {
pub address: String,
#[serde(rename = "remoteHost")]
#[serde(skip_serializing_if = "Option::is_none")]
pub remote_host: Option<String>,
#[serde(rename = "type")]
pub typ: WebhookDSNType,
pub message: String,
#[serde(rename = "nextRetry")]
#[serde(skip_serializing_if = "Option::is_none")]
pub next_retry: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "retryCount")]
pub retry_count: Option<u32>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum WebhookDSNType {
Success,
TemporaryFailure,
PermanentFailure,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[serde(rename_all = "camelCase")]
pub enum WebhookMessageFailure {
ParseFailed,
LoopDetected,
DkimPolicy,
ArcPolicy,
DmarcPolicy,
MilterReject,
SieveDiscard,
SieveReject,
QuotaExceeded,
ServerFailure,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[serde(rename_all = "lowercase")]
pub enum WebhookIngestSource {
Smtp,
Jmap,
Imap,
}
fn has_no_alignment(alignment: &IdentityAlignment) -> bool {
matches!(
alignment,
IdentityAlignment::None | IdentityAlignment::Unspecified
)
}
fn has_no_delivery_result(result: &DeliveryResult) -> bool {
matches!(result, DeliveryResult::Unspecified)
}
fn has_no_auth_failure(failure: &AuthFailureType) -> bool {
matches!(failure, AuthFailureType::Unspecified)
}

View file

@ -368,7 +368,9 @@ impl LdapMappings {
"posixaccount" | "individual" | "person" | "inetorgperson" => {
principal.typ = Type::Individual
}
"posixgroup" | "groupofuniquenames" | "group" => principal.typ = Type::Group,
"posixgroup" | "groupofuniquenames" | "group" => {
principal.typ = Type::Group
}
_ => continue,
}
break;

View file

@ -22,7 +22,10 @@
*/
use core::cache::CachedDirectory;
use std::{fmt::Debug, sync::Arc};
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use ahash::AHashMap;
use backend::{
@ -311,3 +314,18 @@ impl PartialEq for DirectoryError {
}
}
}
impl Display for DirectoryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Ldap(error) => write!(f, "LDAP error: {}", error),
Self::Store(error) => write!(f, "Store error: {}", error),
Self::Imap(error) => write!(f, "IMAP error: {}", error),
Self::Smtp(error) => write!(f, "SMTP error: {}", error),
Self::Pool(error) => write!(f, "Pool error: {}", error),
Self::Management(error) => write!(f, "Management error: {:?}", error),
Self::TimedOut => write!(f, "Directory timed out"),
Self::Unsupported => write!(f, "Method not supported by directory"),
}
}
}

View file

@ -31,7 +31,7 @@ use imap_proto::{
use crate::core::{ImapUidToId, MailboxId, SelectedMailbox, Session, SessionData};
use common::listener::SessionStream;
use jmap::email::ingest::IngestEmail;
use jmap::email::ingest::{IngestEmail, IngestSource};
use jmap_proto::types::{acl::Acl, keyword::Keyword, state::StateChange, type_state::DataType};
use mail_parser::MessageParser;
@ -131,7 +131,7 @@ impl<T: SessionStream> SessionData<T> {
mailbox_ids: vec![mailbox_id],
keywords: message.flags.into_iter().map(Keyword::from).collect(),
received_at: message.received_at.map(|d| d as u64),
skip_duplicates: false,
source: IngestSource::Imap,
encrypt: self.jmap.core.jmap.encrypt && self.jmap.core.jmap.encrypt_append,
})
.await

View file

@ -21,7 +21,7 @@
* for more details.
*/
use common::{listener::SessionStream, AuthResult};
use common::{config::server::ServerProtocol, listener::SessionStream, AuthResult};
use imap_proto::{
protocol::{authenticate::Mechanism, capability::Capability},
receiver::{self, Request},
@ -122,7 +122,7 @@ impl<T: SessionStream> Session<T> {
Credentials::Plain { username, secret } | Credentials::XOauth2 { username, secret } => {
match self
.jmap
.authenticate_plain(&username, &secret, self.remote_addr)
.authenticate_plain(&username, &secret, self.remote_addr, ServerProtocol::Imap)
.await
{
AuthResult::Success(token) => Some(token),

View file

@ -23,7 +23,7 @@
use std::{net::IpAddr, sync::Arc, time::Instant};
use common::{listener::limiter::InFlight, AuthResult};
use common::{config::server::ServerProtocol, listener::limiter::InFlight, AuthResult};
use directory::{Principal, QueryBy};
use hyper::header;
use jmap_proto::error::request::RequestError;
@ -63,8 +63,9 @@ impl JMAP {
})
})
{
if let AuthResult::Success(access_token) =
self.authenticate_plain(&account, &secret, remote_ip).await
if let AuthResult::Success(access_token) = self
.authenticate_plain(&account, &secret, remote_ip, ServerProtocol::Http)
.await
{
Some(access_token)
} else {
@ -154,16 +155,19 @@ impl JMAP {
username: &str,
secret: &str,
remote_ip: IpAddr,
protocol: ServerProtocol,
) -> AuthResult<AccessToken> {
match self
.core
.authenticate(
&self.core.storage.directory,
&self.smtp.inner.ipc,
&Credentials::Plain {
username: username.to_string(),
secret: secret.to_string(),
},
remote_ip,
protocol,
true,
)
.await

View file

@ -314,8 +314,9 @@ impl JMAP {
};
// Check quota
if account_quota > 0
&& metadata.size as i64 + self.get_used_quota(account_id).await? > account_quota
if !self
.has_available_quota(account_id, account_quota, metadata.size as i64)
.await?
{
return Ok(Err(SetError::over_quota()));
}

View file

@ -41,7 +41,7 @@ use utils::map::vec_map::VecMap;
use crate::{auth::AccessToken, IngestError, JMAP};
use super::ingest::IngestEmail;
use super::ingest::{IngestEmail, IngestSource};
impl JMAP {
pub async fn email_import(
@ -140,7 +140,7 @@ impl JMAP {
mailbox_ids,
keywords: email.keywords,
received_at: email.received_at.map(|r| r.into()),
skip_duplicates: true,
source: IngestSource::Jmap,
encrypt: self.core.jmap.encrypt && self.core.jmap.encrypt_append,
})
.await

View file

@ -23,6 +23,7 @@
use std::{borrow::Cow, time::Duration};
use common::webhooks::{WebhookIngestSource, WebhookPayload, WebhookType};
use jmap_proto::{
object::Object,
types::{
@ -76,10 +77,17 @@ pub struct IngestEmail<'x> {
pub mailbox_ids: Vec<u32>,
pub keywords: Vec<Keyword>,
pub received_at: Option<u64>,
pub skip_duplicates: bool,
pub source: IngestSource,
pub encrypt: bool,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum IngestSource {
Smtp,
Jmap,
Imap,
}
const MAX_RETRIES: u32 = 10;
impl JMAP {
@ -90,13 +98,10 @@ impl JMAP {
) -> Result<IngestedEmail, IngestError> {
// Check quota
let mut raw_message_len = params.raw_message.len() as i64;
if params.account_quota > 0
&& raw_message_len
+ self
.get_used_quota(params.account_id)
.await
.map_err(|_| IngestError::Temporary)?
> params.account_quota
if !self
.has_available_quota(params.account_id, params.account_quota, raw_message_len)
.await
.map_err(|_| IngestError::Temporary)?
{
return Err(IngestError::OverQuota);
}
@ -162,7 +167,7 @@ impl JMAP {
}
// Check for duplicates
if params.skip_duplicates
if params.source == IngestSource::Smtp
&& !message_id.is_empty()
&& !self
.core
@ -387,6 +392,31 @@ impl JMAP {
size = raw_message_len,
"Ingested e-mail.");
// Send webhook event
if self
.core
.has_webhook_subscribers(WebhookType::MessageAppended)
{
self.smtp
.inner
.ipc
.send_webhook(
WebhookType::MessageAppended,
WebhookPayload::MessageAppended {
account_id: params.account_id,
mailbox_ids: params.mailbox_ids,
source: match params.source {
IngestSource::Smtp => WebhookIngestSource::Smtp,
IngestSource::Jmap => WebhookIngestSource::Jmap,
IngestSource::Imap => WebhookIngestSource::Imap,
},
encrypt: params.encrypt,
size: raw_message_len as usize,
},
)
.await;
}
Ok(IngestedEmail {
id,
change_id,

View file

@ -63,7 +63,7 @@ use crate::{auth::AccessToken, mailbox::UidMailbox, IngestError, JMAP};
use super::{
headers::{BuildHeader, ValueToHeader},
ingest::IngestEmail,
ingest::{IngestEmail, IngestSource},
};
impl JMAP {
@ -737,7 +737,7 @@ impl JMAP {
mailbox_ids: mailboxes,
keywords,
received_at,
skip_duplicates: false,
source: IngestSource::Jmap,
encrypt: self.core.jmap.encrypt && self.core.jmap.encrypt_append,
})
.await

View file

@ -29,7 +29,11 @@ use std::{
};
use auth::{rate_limit::ConcurrencyLimiters, AccessToken};
use common::{manager::webadmin::WebAdminManager, Core, DeliveryEvent, SharedCore};
use common::{
manager::webadmin::WebAdminManager,
webhooks::{WebhookPayload, WebhookType},
Core, DeliveryEvent, SharedCore,
};
use dashmap::DashMap;
use directory::QueryBy;
use email::cache::Threads;
@ -404,6 +408,43 @@ impl JMAP {
})
}
pub async fn has_available_quota(
&self,
account_id: u32,
account_quota: i64,
item_size: i64,
) -> Result<bool, MethodError> {
if account_quota == 0 {
return Ok(true);
}
let used_quota = self.get_used_quota(account_id).await?;
if used_quota + item_size <= account_quota {
Ok(true)
} else {
// Send webhook
if self
.core
.has_webhook_subscribers(WebhookType::AccountOverQuota)
{
self.smtp
.inner
.ipc
.send_webhook(
WebhookType::AccountOverQuota,
WebhookPayload::AccountOverQuota {
account_id,
quota_limit: account_quota as usize,
quota_used: used_quota as usize,
object_size: item_size as usize,
},
)
.await;
}
Ok(false)
}
}
pub async fn filter(
&self,
account_id: u32,

View file

@ -22,11 +22,12 @@
*/
use base64::{engine::general_purpose, Engine};
use common::IPC_CHANNEL_BUFFER;
use jmap_proto::types::id::Id;
use store::ahash::{AHashMap, AHashSet};
use tokio::sync::mpsc;
use crate::{api::StateChangeResponse, services::IPC_CHANNEL_BUFFER, JmapInstance, LONG_SLUMBER};
use crate::{api::StateChangeResponse, JmapInstance, LONG_SLUMBER};
use super::{ece::ece_encrypt, EncryptionKeys, Event, PushServer, PushUpdate};
@ -48,6 +49,9 @@ pub fn spawn_push_manager(core: JmapInstance) -> mpsc::Sender<Event> {
let mut retry_ids = AHashSet::default();
loop {
// Wait for the next event or timeout
let event_or_timeout = tokio::time::timeout(retry_timeout, push_rx.recv()).await;
// Load settings
let core_ = core.core.load();
let push_attempt_interval = core_.jmap.push_attempt_interval;
@ -57,7 +61,7 @@ pub fn spawn_push_manager(core: JmapInstance) -> mpsc::Sender<Event> {
let push_verify_timeout = core_.jmap.push_verify_timeout;
let push_throttle = core_.jmap.push_throttle;
match tokio::time::timeout(retry_timeout, push_rx.recv()).await {
match event_or_timeout {
Ok(Some(event)) => match event {
Event::Update { updates } => {
for update in updates {

View file

@ -22,11 +22,11 @@
*/
use crate::auth::SymmetricEncrypt;
use crate::services::IPC_CHANNEL_BUFFER;
use crate::JmapInstance;
use super::request::Request;
use super::{Gossiper, Peer, UDP_MAX_PAYLOAD};
use common::IPC_CHANNEL_BUFFER;
use std::net::IpAddr;
use std::time::{Duration, Instant};
use std::{net::SocketAddr, sync::Arc};

View file

@ -26,14 +26,13 @@ use std::{
time::{Duration, Instant},
};
use common::IPC_CHANNEL_BUFFER;
use store::{write::purge::PurgeStore, BlobStore, LookupStore, Store};
use tokio::sync::mpsc;
use utils::map::ttl_dashmap::TtlMap;
use crate::{Inner, JmapInstance, JMAP, LONG_SLUMBER};
use super::IPC_CHANNEL_BUFFER;
pub enum Event {
IndexStart,
IndexDone,

View file

@ -27,7 +27,11 @@ use jmap_proto::types::{state::StateChange, type_state::DataType};
use mail_parser::MessageParser;
use store::ahash::AHashMap;
use crate::{email::ingest::IngestEmail, mailbox::INBOX_ID, IngestError, JMAP};
use crate::{
email::ingest::{IngestEmail, IngestSource},
mailbox::INBOX_ID,
IngestError, JMAP,
};
impl JMAP {
pub async fn deliver_message(&self, message: IngestMessage) -> Vec<DeliveryResult> {
@ -123,7 +127,7 @@ impl JMAP {
mailbox_ids: vec![INBOX_ID],
keywords: vec![],
received_at: None,
skip_duplicates: true,
source: IngestSource::Smtp,
encrypt: self.core.jmap.encrypt,
})
.await

View file

@ -27,5 +27,3 @@ pub mod housekeeper;
pub mod index;
pub mod ingest;
pub mod state;
pub const IPC_CHANNEL_BUFFER: usize = 1024;

View file

@ -23,6 +23,7 @@
use std::time::{Duration, Instant, SystemTime};
use common::IPC_CHANNEL_BUFFER;
use jmap_proto::types::{id::Id, state::StateChange, type_state::DataType};
use store::ahash::AHashMap;
use tokio::sync::mpsc;
@ -33,8 +34,6 @@ use crate::{
JmapInstance, JMAP,
};
use super::IPC_CHANNEL_BUFFER;
#[derive(Debug)]
pub enum Event {
Subscribe {

View file

@ -35,7 +35,7 @@ use store::{
};
use crate::{
email::ingest::{IngestEmail, IngestedEmail},
email::ingest::{IngestEmail, IngestSource, IngestedEmail},
mailbox::{INBOX_ID, TRASH_ID},
sieve::SeenIdHash,
IngestError, JMAP,
@ -446,7 +446,7 @@ impl JMAP {
mailbox_ids: sieve_message.file_into,
keywords: sieve_message.flags,
received_at: None,
skip_duplicates: true,
source: IngestSource::Smtp,
encrypt: self.core.jmap.encrypt,
})
.await

View file

@ -144,9 +144,13 @@ impl JMAP {
_ => unreachable!(),
}
} else {
ctx.response.not_created.append(id, SetError::new(SetErrorType::OverQuota).with_description(
"There are too many sieve scripts, please delete some before adding a new one.",
));
ctx.response.not_created.append(
id,
SetError::new(SetErrorType::OverQuota).with_description(concat!(
"There are too many sieve scripts, ",
"please delete some before adding a new one."
)),
);
}
}
@ -422,9 +426,10 @@ impl JMAP {
// Vacation script cannot be modified
if matches!(update.as_ref().and_then(|(_, obj)| obj.inner.properties.get(&Property::Name)), Some(Value::Text ( value )) if value.eq_ignore_ascii_case("vacation"))
{
return Ok(Err(SetError::forbidden().with_description(
"The 'vacation' script cannot be modified, use VacationResponse/set instead.",
)));
return Ok(Err(SetError::forbidden().with_description(concat!(
"The 'vacation' script cannot be modified, ",
"use VacationResponse/set instead."
))));
}
// Parse properties
@ -521,12 +526,12 @@ impl JMAP {
// Check access
if let Some(mut bytes) = self.blob_download(&blob_id, ctx.access_token).await? {
// Check quota
if ctx.account_quota > 0
&& bytes.len() as i64 + self.get_used_quota(ctx.account_id).await?
> ctx.account_quota
{
return Ok(Err(SetError::over_quota()));
}
if !self
.has_available_quota(ctx.account_id, ctx.account_quota, bytes.len() as i64)
.await?
{
return Ok(Err(SetError::over_quota()));
}
// Compile script
match self.core.sieve.untrusted_compiler.compile(&bytes) {

View file

@ -19,7 +19,7 @@ path = "src/main.rs"
store = { path = "../store" }
jmap = { path = "../jmap" }
jmap_proto = { path = "../jmap-proto" }
smtp = { path = "../smtp", features = ["local_delivery"] }
smtp = { path = "../smtp" }
imap = { path = "../imap" }
pop3 = { path = "../pop3" }
managesieve = { path = "../managesieve" }

View file

@ -23,13 +23,12 @@
use std::time::Duration;
use common::{config::server::ServerProtocol, manager::boot::BootManager};
use imap::core::{ImapSessionManager, IMAP};
use jmap::{
api::JmapSessionManager,
services::{gossip::spawn::GossiperBuilder, IPC_CHANNEL_BUFFER},
JMAP,
use common::{
config::server::ServerProtocol, manager::boot::BootManager,
webhooks::manager::spawn_webhook_manager, Ipc, IPC_CHANNEL_BUFFER,
};
use imap::core::{ImapSessionManager, IMAP};
use jmap::{api::JmapSessionManager, services::gossip::spawn::GossiperBuilder, JMAP};
use managesieve::core::ManageSieveSessionManager;
use pop3::Pop3SessionManager;
use smtp::core::{SmtpSessionManager, SMTP};
@ -52,9 +51,18 @@ async fn main() -> std::io::Result<()> {
let mut config = init.config;
let core = init.core;
// Init servers
// Spawn webhook manager
let webhook_tx = spawn_webhook_manager(core.clone());
// Setup IPC channels
let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER);
let smtp = SMTP::init(&mut config, core.clone(), delivery_tx).await;
let ipc = Ipc {
delivery_tx,
webhook_tx,
};
// Init servers
let smtp = SMTP::init(&mut config, core.clone(), ipc).await;
let jmap = JMAP::init(&mut config, delivery_rx, core.clone(), smtp.inner.clone()).await;
let imap = IMAP::init(&mut config, jmap.clone()).await;
let gossiper = GossiperBuilder::try_parse(&mut config);
@ -101,7 +109,7 @@ async fn main() -> std::io::Result<()> {
// Spawn gossip
if let Some(gossiper) = gossiper {
gossiper.spawn(jmap, shutdown_rx).await;
gossiper.spawn(jmap, shutdown_rx.clone()).await;
}
// Wait for shutdown signal

View file

@ -22,6 +22,7 @@
*/
use common::{
config::server::ServerProtocol,
listener::{limiter::ConcurrencyLimiter, SessionStream},
AuthResult,
};
@ -99,7 +100,12 @@ impl<T: SessionStream> Session<T> {
Credentials::Plain { username, secret } | Credentials::XOauth2 { username, secret } => {
match self
.jmap
.authenticate_plain(&username, &secret, self.remote_addr)
.authenticate_plain(
&username,
&secret,
self.remote_addr,
ServerProtocol::ManageSieve,
)
.await
{
AuthResult::Success(token) => Some(token),

View file

@ -55,12 +55,18 @@ impl<T: AsyncRead + AsyncWrite> Session<T> {
// Check quota
let access_token = self.state.access_token();
let account_id = access_token.primary_id();
if access_token.quota > 0
&& script_bytes.len() as i64 + self.jmap.get_used_quota(account_id).await?
> access_token.quota as i64
if !self
.jmap
.has_available_quota(
account_id,
access_token.quota as i64,
script_bytes.len() as i64,
)
.await?
{
return Err(StatusResponse::no("Quota exceeded.").with_code(ResponseCode::Quota));
}
if self
.jmap
.get_document_ids(account_id, Collection::SieveScript)

View file

@ -22,6 +22,7 @@
*/
use common::{
config::server::ServerProtocol,
listener::{limiter::ConcurrencyLimiter, SessionStream},
AuthResult,
};
@ -103,7 +104,7 @@ impl<T: SessionStream> Session<T> {
Credentials::Plain { username, secret } | Credentials::XOauth2 { username, secret } => {
match self
.jmap
.authenticate_plain(&username, &secret, self.remote_addr)
.authenticate_plain(&username, &secret, self.remote_addr, ServerProtocol::Pop3)
.await
{
AuthResult::Success(token) => Some(token),

View file

@ -61,7 +61,6 @@ bincode = "1.3.1"
[features]
test_mode = []
local_delivery = []
#[[bench]]
#name = "hash"

View file

@ -29,12 +29,12 @@ use std::{
};
use common::{
config::smtp::auth::VerifyStrategy,
config::{scripts::ScriptCache, smtp::auth::VerifyStrategy},
listener::{
limiter::{ConcurrencyLimiter, InFlight},
ServerInstance,
},
Core, DeliveryEvent, SharedCore,
Core, Ipc, SharedCore,
};
use dashmap::DashMap;
use directory::Directory;
@ -100,8 +100,8 @@ pub struct Inner {
pub report_tx: mpsc::Sender<reporting::Event>,
pub snowflake_id: SnowflakeIdGenerator,
pub connectors: TlsConnectors,
#[cfg(feature = "local_delivery")]
pub delivery_tx: mpsc::Sender<DeliveryEvent>,
pub ipc: Ipc,
pub script_cache: ScriptCache,
}
pub struct TlsConnectors {
@ -279,7 +279,6 @@ impl From<SmtpInstance> for SMTP {
}
}
#[cfg(feature = "local_delivery")]
lazy_static::lazy_static! {
static ref SIEVE: Arc<ServerInstance> = Arc::new(ServerInstance {
id: "sieve".to_string(),
@ -291,7 +290,6 @@ static ref SIEVE: Arc<ServerInstance> = Arc::new(ServerInstance {
});
}
#[cfg(feature = "local_delivery")]
impl Session<common::listener::stream::NullIo> {
pub fn local(core: SMTP, instance: std::sync::Arc<ServerInstance>, data: SessionData) -> Self {
Session {
@ -368,7 +366,6 @@ impl Session<common::listener::stream::NullIo> {
}
}
#[cfg(feature = "local_delivery")]
impl SessionData {
pub fn local(
mail_from: Option<SessionAddress>,
@ -404,14 +401,12 @@ impl SessionData {
}
}
#[cfg(feature = "local_delivery")]
impl Default for SessionData {
fn default() -> Self {
Self::local(None, vec![], vec![])
}
}
#[cfg(feature = "local_delivery")]
impl SessionAddress {
pub fn new(address: String) -> Self {
let address_lcase = address.to_lowercase();
@ -438,7 +433,11 @@ impl Default for Inner {
pki_verify: mail_send::smtp::tls::build_tls_connector(false),
dummy_verify: mail_send::smtp::tls::build_tls_connector(true),
},
delivery_tx: mpsc::channel(1).0,
ipc: Ipc {
delivery_tx: mpsc::channel(1).0,
webhook_tx: mpsc::channel(1).0,
},
script_cache: Default::default(),
}
}
}

View file

@ -183,7 +183,14 @@ impl<T: SessionStream> Session<T> {
match self
.core
.core
.authenticate(directory, &credentials, self.data.remote_ip, false)
.authenticate(
directory,
&self.core.inner.ipc,
&credentials,
self.data.remote_ip,
self.instance.protocol,
false,
)
.await
{
Ok(AuthResult::Success(principal)) => {

View file

@ -32,12 +32,14 @@ use common::{
config::smtp::{auth::VerifyStrategy, session::Stage},
listener::SessionStream,
scripts::ScriptModification,
webhooks::{WebhookMessageFailure, WebhookPayload, WebhookType},
};
use mail_auth::{
common::{headers::HeaderWriter, verify::VerifySignature},
dmarc, AuthenticatedMessage, AuthenticationResults, DkimResult, DmarcResult, ReceivedSpf,
};
use mail_builder::headers::{date::Date, message_id::generate_message_id_header};
use mail_parser::DateTime;
use sieve::runtime::Variable;
use smtp_proto::{
MAIL_BY_RETURN, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS,
@ -70,6 +72,9 @@ impl<T: SessionStream> Session<T> {
event = "parse-failed",
size = raw_message.len());
self.send_failure_webhook(WebhookMessageFailure::ParseFailed)
.await;
return (&b"550 5.7.7 Failed to parse message.\r\n"[..]).into();
};
@ -91,6 +96,10 @@ impl<T: SessionStream> Session<T> {
return_path = self.data.mail_from.as_ref().unwrap().address,
from = auth_message.from(),
received_headers = auth_message.received_headers_count());
self.send_failure_webhook(WebhookMessageFailure::LoopDetected)
.await;
return (&b"450 4.4.6 Too many Received headers. Possible loop detected.\r\n"[..])
.into();
}
@ -141,6 +150,9 @@ impl<T: SessionStream> Session<T> {
result = ?dkim_output.iter().map(|d| d.result().to_string()).collect::<Vec<_>>(),
"No passing DKIM signatures found.");
self.send_failure_webhook(WebhookMessageFailure::DkimPolicy)
.await;
// 'Strict' mode violates the advice of Section 6.1 of RFC6376
return if dkim_output
.iter()
@ -197,6 +209,9 @@ impl<T: SessionStream> Session<T> {
result = %arc_output.result(),
"ARC validation failed.");
self.send_failure_webhook(WebhookMessageFailure::ArcPolicy)
.await;
return if matches!(arc_output.result(), DkimResult::TempError(_)) {
(&b"451 4.7.29 ARC validation failed.\r\n"[..]).into()
} else {
@ -316,6 +331,9 @@ impl<T: SessionStream> Session<T> {
}
if rejected {
self.send_failure_webhook(WebhookMessageFailure::DmarcPolicy)
.await;
return if is_temp_fail {
(&b"451 4.7.1 Email temporarily rejected per DMARC policy.\r\n"[..]).into()
} else {
@ -421,7 +439,12 @@ impl<T: SessionStream> Session<T> {
modifications = modifications_;
}
}
Err(response) => return response.into_bytes(),
Err(response) => {
self.send_failure_webhook(WebhookMessageFailure::MilterReject)
.await;
return response.into_bytes();
}
};
// Run JMilter filters
@ -438,7 +461,12 @@ impl<T: SessionStream> Session<T> {
modifications.extend(modifications_);
}
}
Err(response) => return response.into_bytes(),
Err(response) => {
self.send_failure_webhook(WebhookMessageFailure::MilterReject)
.await;
return response.into_bytes();
}
};
// Apply modifications
@ -624,9 +652,15 @@ impl<T: SessionStream> Session<T> {
event = "reject",
reason = message);
self.send_failure_webhook(WebhookMessageFailure::SieveReject)
.await;
return message.into_bytes().into();
}
ScriptResult::Discard => {
self.send_failure_webhook(WebhookMessageFailure::SieveDiscard)
.await;
return (b"250 2.0.0 Message queued for delivery.\r\n"[..]).into();
}
};
@ -725,15 +759,52 @@ impl<T: SessionStream> Session<T> {
// Verify queue quota
if self.core.has_quota(&mut message).await {
// Prepare webhook event
let queue_id = message.id;
let webhook_event = self
.core
.core
.has_webhook_subscribers(WebhookType::MessageAccepted)
.then(|| WebhookPayload::MessageAccepted {
id: queue_id,
remote_ip: self.data.remote_ip.into(),
local_port: self.data.local_port.into(),
authenticated_as: (!self.data.authenticated_as.is_empty())
.then(|| self.data.authenticated_as.clone()),
return_path: message.return_path_lcase.clone(),
recipients: message
.recipients
.iter()
.map(|r| r.address_lcase.clone())
.collect(),
next_retry: DateTime::from_timestamp(message.next_delivery_event() as i64)
.to_rfc3339(),
next_dsn: DateTime::from_timestamp(message.next_dsn() as i64).to_rfc3339(),
expires: DateTime::from_timestamp(message.expires() as i64).to_rfc3339(),
size: message.size,
});
// Queue message
if message
.queue(Some(&headers), raw_message, &self.core, &self.span)
.await
{
// Send webhook event
if let Some(event) = webhook_event {
self.core
.inner
.ipc
.send_webhook(WebhookType::MessageAccepted, event)
.await;
}
self.state = State::Accepted(queue_id);
self.data.messages_sent += 1;
(b"250 2.0.0 Message queued for delivery.\r\n"[..]).into()
} else {
self.send_failure_webhook(WebhookMessageFailure::ServerFailure)
.await;
(b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into()
}
} else {
@ -744,6 +815,10 @@ impl<T: SessionStream> Session<T> {
from = message.return_path,
"Queue quota exceeded, rejecting message."
);
self.send_failure_webhook(WebhookMessageFailure::QuotaExceeded)
.await;
(b"452 4.3.1 Mail system full, try again later.\r\n"[..]).into()
}
}
@ -955,4 +1030,38 @@ impl<T: SessionStream> Session<T> {
headers.extend_from_slice(Date::now().to_rfc822().as_bytes());
headers.extend_from_slice(b"\r\n");
}
async fn send_failure_webhook(&self, reason: WebhookMessageFailure) {
if self
.core
.core
.has_webhook_subscribers(WebhookType::MessageRejected)
{
self.core
.inner
.ipc
.send_webhook(
WebhookType::MessageRejected,
WebhookPayload::MessageRejected {
reason,
remote_ip: self.data.remote_ip,
local_port: self.data.local_port,
authenticated_as: (!self.data.authenticated_as.is_empty())
.then(|| self.data.authenticated_as.clone()),
return_path: self
.data
.mail_from
.as_ref()
.map(|m| m.address_lcase.clone()),
recipients: self
.data
.rcpt_to
.iter()
.map(|r| r.address_lcase.clone())
.collect(),
},
)
.await;
}
}
}

View file

@ -28,7 +28,7 @@ use common::{
config::smtp::session::{Mechanism, Stage},
listener::SessionStream,
};
use mail_auth::spf::verify::HasLabels;
use mail_auth::spf::verify::HasValidLabels;
use smtp_proto::*;
impl<T: SessionStream> Session<T> {
@ -37,7 +37,7 @@ impl<T: SessionStream> Session<T> {
if domain != self.data.helo_domain {
// Reject non-FQDN EHLO domains - simply checks that the hostname has at least one dot
if self.params.ehlo_reject_non_fqdn && !domain.as_str().has_labels() {
if self.params.ehlo_reject_non_fqdn && !domain.as_str().has_valid_labels() {
tracing::info!(parent: &self.span,
context = "ehlo",
event = "reject",

View file

@ -36,10 +36,14 @@ pub(super) async fn send_jmilter_request(
.map_err(|err| format!("Failed to create HTTP client: {}", err))?
.post(&jmilter.url)
.headers(jmilter.headers.clone())
.body(serde_json::to_string(&request).unwrap())
.body(
serde_json::to_string(&request)
.map_err(|err| format!("Failed to serialize jMilter request: {}", err))?,
)
.send()
.await
.map_err(|err| format!("jMilter request failed: {err}"))?;
if response.status().is_success() {
serde_json::from_slice(
response

View file

@ -25,6 +25,7 @@ use ahash::AHashMap;
use common::{
config::smtp::session::{JMilter, Stage},
listener::SessionStream,
DAEMON_NAME,
};
use mail_auth::AuthenticatedMessage;
@ -37,7 +38,6 @@ use crate::{
milter::Modification,
FilterResponse,
},
DAEMON_NAME,
};
use super::{client::send_jmilter_request, Action, Response};

View file

@ -26,6 +26,7 @@ use std::borrow::Cow;
use common::{
config::smtp::session::{Milter, Stage},
listener::SessionStream,
DAEMON_NAME,
};
use mail_auth::AuthenticatedMessage;
use smtp_proto::{request::parser::Rfc5321Parser, IntoString};
@ -35,7 +36,6 @@ use crate::{
core::{Session, SessionAddress, SessionData},
inbound::{milter::MilterClient, FilterResponse},
queue::DomainPart,
DAEMON_NAME,
};
use super::{Action, Error, Macros, Modification};

View file

@ -164,7 +164,7 @@ impl<T: SessionStream> Session<T> {
}
Request::Help { .. } => {
self.write(
b"250 2.0.0 Help can be found at https://stalw.art/smtp/\r\n",
b"250 2.0.0 Help can be found at https://stalw.art/docs/\r\n",
)
.await?;
}

View file

@ -82,10 +82,10 @@ impl SessionManager for SmtpSessionManager {
.report_tx
.send(reporting::Event::Stop)
.await;
#[cfg(feature = "local_delivery")]
let _ = self
.inner
.inner
.ipc
.delivery_tx
.send(common::DeliveryEvent::Stop)
.await;

View file

@ -24,7 +24,7 @@
use crate::core::{throttle::ThrottleKeyHasherBuilder, TlsConnectors};
use core::{Inner, SmtpInstance, SMTP};
use common::SharedCore;
use common::{config::scripts::ScriptCache, Ipc, SharedCore};
use dashmap::DashMap;
use mail_send::smtp::tls::build_tls_connector;
use queue::manager::SpawnQueue;
@ -39,15 +39,8 @@ pub mod queue;
pub mod reporting;
pub mod scripts;
pub static USER_AGENT: &str = concat!("StalwartSMTP/", env!("CARGO_PKG_VERSION"),);
pub static DAEMON_NAME: &str = concat!("Stalwart SMTP v", env!("CARGO_PKG_VERSION"),);
impl SMTP {
pub async fn init(
config: &mut Config,
core: SharedCore,
#[cfg(feature = "local_delivery")] delivery_tx: mpsc::Sender<common::DeliveryEvent>,
) -> SmtpInstance {
pub async fn init(config: &mut Config, core: SharedCore, ipc: Ipc) -> SmtpInstance {
// Build inner
let capacity = config.property("cache.capacity").unwrap_or(2);
let shard = config
@ -77,8 +70,8 @@ impl SMTP {
pki_verify: build_tls_connector(false),
dummy_verify: build_tls_connector(true),
},
#[cfg(feature = "local_delivery")]
delivery_tx,
ipc,
script_cache: ScriptCache::parse(config),
};
let inner = SmtpInstance::new(core, inner);

View file

@ -202,13 +202,12 @@ impl DeliveryAttempt {
.await
.and_then(|name| core.core.get_relay_host(&name))
{
#[cfg(feature = "local_delivery")]
Some(next_hop) if next_hop.protocol == ServerProtocol::Http => {
// Deliver message locally
let delivery_result = message
.deliver_local(
recipients.iter_mut().filter(|r| r.domain_idx == domain_idx),
&core.inner.delivery_tx,
&core.inner.ipc.delivery_tx,
&span,
)
.await;

View file

@ -36,7 +36,7 @@ use crate::queue::{
pub mod dane;
pub mod delivery;
#[cfg(feature = "local_delivery")]
pub mod local;
pub mod lookup;
pub mod mta_sts;
@ -158,7 +158,6 @@ impl Status<(), Error> {
}))
}
#[cfg(feature = "local_delivery")]
pub fn local_error() -> Self {
Status::TemporaryFailure(Error::ConnectionError(ErrorDetails {
entity: "localhost".to_string(),

View file

@ -21,6 +21,7 @@
* for more details.
*/
use common::webhooks::{WebhookDSN, WebhookDSNType, WebhookPayload, WebhookType};
use mail_builder::headers::content_type::ContentType;
use mail_builder::headers::HeaderType;
use mail_builder::mime::{make_boundary, BodyPart, MimePart};
@ -42,7 +43,11 @@ use super::{
impl SMTP {
pub async fn send_dsn(&self, message: &mut Message, span: &tracing::Span) {
// Send webhook event
self.send_dsn_webhook(message).await;
if !message.return_path.is_empty() {
// Build DSN
if let Some(dsn) = message.build_dsn(self, span).await {
let mut dsn_message = self.new_message("", "", "");
dsn_message
@ -58,14 +63,142 @@ impl SMTP {
let signature = self
.sign_message(message, &self.core.smtp.queue.dsn.sign, &dsn, span)
.await;
// Queue DSN
dsn_message
.queue(signature.as_deref(), &dsn, self, span)
.await;
}
} else {
// Handle double bounce
message.handle_double_bounce(span);
}
}
async fn send_dsn_webhook(&self, message: &Message) {
let typ = if !message.return_path.is_empty() {
WebhookType::DSN
} else {
WebhookType::DoubleBounce
};
if !self.core.has_webhook_subscribers(typ) {
return;
}
let now = now();
let mut webhook_data = Vec::new();
for rcpt in &message.recipients {
if rcpt.has_flag(RCPT_DSN_SENT) {
continue;
}
let domain = &message.domains[rcpt.domain_idx];
match &rcpt.status {
Status::Completed(response) => {
webhook_data.push(WebhookDSN {
address: rcpt.address_lcase.clone(),
typ: WebhookDSNType::Success,
remote_host: response.hostname.clone().into(),
message: response.response.to_string(),
next_retry: None,
expires: None,
retry_count: None,
});
}
Status::TemporaryFailure(response) if domain.notify.due <= now => {
webhook_data.push(WebhookDSN {
address: rcpt.address_lcase.clone(),
typ: WebhookDSNType::TemporaryFailure,
remote_host: response.hostname.entity.clone().into(),
message: response.response.to_string(),
next_retry: DateTime::from_timestamp(domain.retry.due as i64)
.to_rfc3339()
.into(),
expires: DateTime::from_timestamp(domain.expires as i64)
.to_rfc3339()
.into(),
retry_count: domain.retry.inner.into(),
});
}
Status::PermanentFailure(response) => {
webhook_data.push(WebhookDSN {
address: rcpt.address_lcase.clone(),
typ: WebhookDSNType::PermanentFailure,
remote_host: response.hostname.entity.clone().into(),
message: response.response.to_string(),
next_retry: None,
expires: None,
retry_count: domain.retry.inner.into(),
});
}
Status::Scheduled => {
// There is no status for this address, use the domain's status.
match &domain.status {
Status::PermanentFailure(err) => {
webhook_data.push(WebhookDSN {
address: rcpt.address_lcase.clone(),
typ: WebhookDSNType::PermanentFailure,
remote_host: None,
message: err.to_string(),
next_retry: None,
expires: None,
retry_count: domain.retry.inner.into(),
});
}
Status::TemporaryFailure(err) if domain.notify.due <= now => {
webhook_data.push(WebhookDSN {
address: rcpt.address_lcase.clone(),
typ: WebhookDSNType::TemporaryFailure,
remote_host: None,
message: err.to_string(),
next_retry: DateTime::from_timestamp(domain.retry.due as i64)
.to_rfc3339()
.into(),
expires: DateTime::from_timestamp(domain.expires as i64)
.to_rfc3339()
.into(),
retry_count: domain.retry.inner.into(),
});
}
Status::Scheduled if domain.notify.due <= now => {
webhook_data.push(WebhookDSN {
address: rcpt.address_lcase.clone(),
typ: WebhookDSNType::TemporaryFailure,
remote_host: None,
message: "Concurrency limited".to_string(),
next_retry: DateTime::from_timestamp(domain.retry.due as i64)
.to_rfc3339()
.into(),
expires: DateTime::from_timestamp(domain.expires as i64)
.to_rfc3339()
.into(),
retry_count: domain.retry.inner.into(),
});
}
_ => continue,
}
}
_ => continue,
}
}
// Send webhook event
if !webhook_data.is_empty() {
self.inner
.ipc
.send_webhook(
typ,
WebhookPayload::DSN {
id: message.id,
sender: message.return_path_lcase.clone(),
status: webhook_data,
created: DateTime::from_timestamp(message.created as i64).to_rfc3339(),
},
)
.await;
}
}
}
impl Message {
@ -273,7 +406,7 @@ impl Message {
// Prepare DSN
let mut dsn_header = String::with_capacity(dsn.len() + 128);
self.write_dsn_headers(&mut dsn_header, &reporting_mta);
let dsn = dsn_header + &dsn;
let dsn = dsn_header + dsn.as_str();
// Fetch up to 1024 bytes of message headers
let headers = match core

View file

@ -165,6 +165,40 @@ impl Message {
next_delivery
}
pub fn next_dsn(&self) -> u64 {
let mut next_dsn = now();
for (pos, domain) in self
.domains
.iter()
.filter(|d| matches!(d.status, Status::Scheduled | Status::TemporaryFailure(_)))
.enumerate()
{
if pos == 0 || domain.notify.due < next_dsn {
next_dsn = domain.notify.due;
}
}
next_dsn
}
pub fn expires(&self) -> u64 {
let mut expires = now();
for (pos, domain) in self
.domains
.iter()
.filter(|d| matches!(d.status, Status::Scheduled | Status::TemporaryFailure(_)))
.enumerate()
{
if pos == 0 || domain.expires < expires {
expires = domain.expires;
}
}
expires
}
pub fn next_event_after(&self, instant: u64) -> Option<u64> {
let mut next_event = None;

View file

@ -30,6 +30,7 @@ use std::{
};
use ahash::AHashMap;
use common::webhooks::{WebhookPayload, WebhookTlsPolicy, WebhookType};
use mail_auth::{
flate2::read::GzDecoder,
report::{tlsrpt::TlsReport, ActionDisposition, DmarcResult, Feedback, Report},
@ -233,6 +234,21 @@ impl SMTP {
let report = match report.format {
Format::Dmarc(_) => match Report::parse_xml(&data) {
Ok(report) => {
// Send webhook
if core
.core
.has_webhook_subscribers(WebhookType::IncomingDmarcReport)
{
core.inner
.ipc
.send_webhook(
WebhookType::IncomingDmarcReport,
report.webhook_payload(),
)
.await;
}
// Log
report.log();
Format::Dmarc(report)
}
@ -248,6 +264,22 @@ impl SMTP {
},
Format::Tls(_) => match TlsReport::parse_json(&data) {
Ok(report) => {
// Send webhook
if core
.core
.has_webhook_subscribers(WebhookType::IncomingTlsReport)
{
core.inner
.ipc
.send_webhook(
WebhookType::IncomingTlsReport,
report.webhook_payload(),
)
.await;
}
// Log
report.log();
Format::Tls(report)
}
@ -263,6 +295,21 @@ impl SMTP {
},
Format::Arf(_) => match Feedback::parse_arf(&data) {
Some(report) => {
// Send webhook
if core
.core
.has_webhook_subscribers(WebhookType::IncomingArfReport)
{
core.inner
.ipc
.send_webhook(
WebhookType::IncomingArfReport,
report.webhook_payload(),
)
.await;
}
// Log
report.log();
Format::Arf(report.into_owned())
}
@ -339,6 +386,7 @@ impl SMTP {
trait LogReport {
fn log(&self);
fn webhook_payload(&self) -> WebhookPayload;
}
impl LogReport for Report {
@ -440,6 +488,81 @@ impl LogReport for Report {
);
}
}
fn webhook_payload(&self) -> WebhookPayload {
let mut dmarc_pass = 0;
let mut dmarc_quarantine = 0;
let mut dmarc_reject = 0;
let mut dmarc_none = 0;
let mut dkim_pass = 0;
let mut dkim_fail = 0;
let mut dkim_none = 0;
let mut spf_pass = 0;
let mut spf_fail = 0;
let mut spf_none = 0;
for record in self.records() {
let count = std::cmp::min(record.count(), 1);
match record.action_disposition() {
ActionDisposition::Pass => {
dmarc_pass += count;
}
ActionDisposition::Quarantine => {
dmarc_quarantine += count;
}
ActionDisposition::Reject => {
dmarc_reject += count;
}
ActionDisposition::None | ActionDisposition::Unspecified => {
dmarc_none += count;
}
}
match record.dmarc_dkim_result() {
DmarcResult::Pass => {
dkim_pass += count;
}
DmarcResult::Fail => {
dkim_fail += count;
}
DmarcResult::Unspecified => {
dkim_none += count;
}
}
match record.dmarc_spf_result() {
DmarcResult::Pass => {
spf_pass += count;
}
DmarcResult::Fail => {
spf_fail += count;
}
DmarcResult::Unspecified => {
spf_none += count;
}
}
}
let range_from = DateTime::from_timestamp(self.date_range_begin() as i64).to_rfc3339();
let range_to = DateTime::from_timestamp(self.date_range_end() as i64).to_rfc3339();
WebhookPayload::IncomingDmarcReport {
range_from,
range_to,
domain: self.domain().to_string(),
report_email: self.email().to_string(),
report_id: self.report_id().to_string(),
dmarc_pass,
dmarc_quarantine,
dmarc_reject,
dmarc_none,
dkim_pass,
dkim_fail,
dkim_none,
spf_pass,
spf_fail,
spf_none,
}
}
}
impl LogReport for TlsReport {
@ -489,6 +612,39 @@ impl LogReport for TlsReport {
}
}
}
fn webhook_payload(&self) -> WebhookPayload {
let mut policies = Vec::with_capacity(self.policies.len());
for policy in self.policies.iter().take(5) {
let mut details = AHashMap::with_capacity(policy.failure_details.len());
for failure in &policy.failure_details {
let num_failures = std::cmp::min(1, failure.failed_session_count);
match details.entry(failure.result_type) {
Entry::Occupied(mut e) => {
*e.get_mut() += num_failures;
}
Entry::Vacant(e) => {
e.insert(num_failures);
}
}
}
policies.push(WebhookTlsPolicy {
range_from: self.date_range.start_datetime.to_rfc3339(),
range_to: self.date_range.end_datetime.to_rfc3339(),
domain: policy.policy.policy_domain.clone(),
report_contact: self.contact_info.clone(),
report_id: self.report_id.clone(),
policy_type: policy.policy.policy_type,
total_successes: policy.summary.total_success,
total_failures: policy.summary.total_failure,
details,
});
}
WebhookPayload::IncomingTlsReport { policies }
}
}
impl LogReport for Feedback<'_> {
@ -517,4 +673,34 @@ impl LogReport for Feedback<'_> {
identity_alignment = ?self.identity_alignment(),
);
}
fn webhook_payload(&self) -> WebhookPayload {
WebhookPayload::IncomingArfReport {
feedback_type: self.feedback_type(),
arrival_date: self
.arrival_date()
.map(|a| DateTime::from_timestamp(a).to_rfc3339()),
authentication_results: self
.authentication_results()
.iter()
.map(|t| t.to_string())
.collect(),
incidents: self.incidents(),
reported_domain: self
.reported_domain()
.iter()
.map(|t| t.to_string())
.collect(),
reported_uri: self.reported_uri().iter().map(|t| t.to_string()).collect(),
reporting_mta: self.reporting_mta().map(|t| t.to_string()),
source_ip: self.source_ip(),
user_agent: self.user_agent().map(|t| t.to_string()),
auth_failure: self.auth_failure(),
delivery_result: self.delivery_result(),
dkim_domain: self.dkim_domain().map(|t| t.to_string()),
dkim_identity: self.dkim_identity().map(|t| t.to_string()),
dkim_selector: self.dkim_selector().map(|t| t.to_string()),
identity_alignment: self.identity_alignment(),
}
}
}

View file

@ -29,6 +29,8 @@ use common::{
resolver::{Policy, Tlsa},
},
expr::if_block::IfBlock,
webhooks::{WebhookPayload, WebhookType},
USER_AGENT,
};
use mail_auth::{
common::headers::HeaderWriter,
@ -47,7 +49,6 @@ use crate::{
core::{Session, SMTP},
inbound::DkimSign,
queue::{DomainPart, Message},
USER_AGENT,
};
pub mod analysis;
@ -165,6 +166,36 @@ impl SMTP {
}
}
// Send webhook
if self
.core
.has_webhook_subscribers(WebhookType::OutgoingReport)
{
self.inner
.ipc
.send_webhook(
WebhookType::OutgoingReport,
WebhookPayload::MessageAccepted {
id: message.id,
remote_ip: None,
local_port: None,
authenticated_as: None,
return_path: message.return_path_lcase.clone(),
recipients: message
.recipients
.iter()
.map(|r| r.address_lcase.clone())
.collect(),
next_retry: DateTime::from_timestamp(message.next_delivery_event() as i64)
.to_rfc3339(),
next_dsn: DateTime::from_timestamp(message.next_dsn() as i64).to_rfc3339(),
expires: DateTime::from_timestamp(message.expires() as i64).to_rfc3339(),
size: message.size,
},
)
.await;
}
// Queue message
message
.queue(signature.as_deref(), &report, self, span)

View file

@ -24,9 +24,12 @@
use std::{collections::hash_map::Entry, sync::Arc, time::Duration};
use ahash::AHashMap;
use common::config::smtp::{
report::AggregateFrequency,
resolver::{Mode, MxPattern},
use common::{
config::smtp::{
report::AggregateFrequency,
resolver::{Mode, MxPattern},
},
USER_AGENT,
};
use mail_auth::{
flate2::{write::GzEncoder, Compression},
@ -44,7 +47,7 @@ use store::{
Deserialize, IterateParams, Serialize, ValueKey,
};
use crate::{core::SMTP, queue::RecipientDomain, USER_AGENT};
use crate::{core::SMTP, queue::RecipientDomain};
use super::{scheduler::ToHash, AggregateTimestamp, ReportLock, SerializedSize, TlsEvent};

View file

@ -123,6 +123,7 @@ impl SMTP {
PluginContext {
span: &span,
core: &self.core,
cache: &self.inner.script_cache,
message: instance.message(),
modifications: &mut modifications,
arguments,

View file

@ -10,7 +10,7 @@ nlp = { path = "../nlp" }
rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] }
foundationdb = { version = "0.9.0", features = ["embedded-fdb-include", "fdb-7_1"], optional = true }
rusqlite = { version = "0.31.0", features = ["bundled"], optional = true }
rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true }
rust-s3 = { version = "0.34.0", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true }
tokio = { version = "1.23", features = ["sync", "fs", "io-util"] }
r2d2 = { version = "0.8.10", optional = true }
futures = { version = "0.3", optional = true }
@ -28,7 +28,7 @@ num_cpus = { version = "1.15.0", optional = true }
blake3 = "1.3.3"
tracing = "0.1"
lz4_flex = { version = "0.11", default-features = false }
deadpool-postgres = { version = "0.12.1", optional = true }
deadpool-postgres = { version = "0.14", optional = true }
tokio-postgres = { version = "0.7.10", optional = true }
tokio-rustls = { version = "0.25.0", optional = true }
rustls = { version = "0.22.0", optional = true }
@ -42,7 +42,7 @@ regex = "1.7.0"
flate2 = "1.0"
async-trait = "0.1.68"
redis = { version = "0.25.2", features = [ "tokio-comp", "tokio-rustls-comp", "tls-rustls-insecure", "tls-rustls-webpki-roots", "cluster-async"], optional = true }
deadpool = { version = "0.10.0", features = ["managed"], optional = true }
deadpool = { version = "0.12", features = ["managed"], optional = true }
bincode = "1.3.3"
arc-swap = "1.6.0"
bitpacking = "0.9.2"

View file

@ -21,7 +21,6 @@
* for more details.
*/
use async_trait::async_trait;
use deadpool::managed;
use redis::{
aio::{ConnectionLike, MultiplexedConnection},
@ -30,7 +29,6 @@ use redis::{
use super::{RedisClusterConnectionManager, RedisConnectionManager};
#[async_trait]
impl managed::Manager for RedisConnectionManager {
type Type = MultiplexedConnection;
type Error = crate::Error;
@ -58,7 +56,6 @@ impl managed::Manager for RedisConnectionManager {
}
}
#[async_trait]
impl managed::Manager for RedisClusterConnectionManager {
type Type = ClusterConnection;
type Error = crate::Error;

View file

@ -80,7 +80,11 @@ impl S3Store {
})
.ok()?
.with_path_style()
.with_request_timeout(timeout),
.with_request_timeout(timeout)
.map_err(|err| {
config.new_build_error(prefix.as_str(), format!("Failed to create bucket: {err:?}"))
})
.ok()?,
prefix: config.value((&prefix, "key-prefix")).map(|s| s.to_string()),
})
}

View file

@ -25,7 +25,7 @@ jmap_proto = { path = "../crates/jmap-proto" }
imap = { path = "../crates/imap", features = ["test_mode"] }
imap_proto = { path = "../crates/imap-proto" }
pop3 = { path = "../crates/pop3", features = ["test_mode"] }
smtp = { path = "../crates/smtp", features = ["test_mode", "local_delivery"] }
smtp = { path = "../crates/smtp", features = ["test_mode"] }
common = { path = "../crates/common", features = ["test_mode"] }
managesieve = { path = "../crates/managesieve", features = ["test_mode"] }
smtp-proto = { version = "0.1" }
@ -61,6 +61,7 @@ serial_test = "3.0.0"
num_cpus = "1.15.0"
async-trait = "0.1.68"
chrono = "0.4"
ring = { version = "0.17" }
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View file

@ -45,7 +45,8 @@ use std::{
use ::managesieve::core::ManageSieveSessionManager;
use common::{
config::server::{ServerProtocol, Servers},
Core,
webhooks::manager::spawn_webhook_manager,
Core, Ipc, IPC_CHANNEL_BUFFER,
};
use ::store::Stores;
@ -53,7 +54,7 @@ use ahash::AHashSet;
use directory::backend::internal::manage::ManageDirectory;
use imap::core::{ImapSessionManager, Inner, IMAP};
use imap_proto::ResponseType;
use jmap::{api::JmapSessionManager, services::IPC_CHANNEL_BUFFER, JMAP};
use jmap::{api::JmapSessionManager, JMAP};
use pop3::Pop3SessionManager;
use smtp::core::{SmtpSessionManager, SMTP};
use tokio::{
@ -321,9 +322,18 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest {
// Parse acceptors
servers.parse_tcp_acceptors(&mut config, shared_core.clone());
// Init servers
// Spawn webhook manager
let webhook_tx = spawn_webhook_manager(shared_core.clone());
// Setup IPC channels
let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER);
let smtp = SMTP::init(&mut config, shared_core.clone(), delivery_tx).await;
let ipc = Ipc {
delivery_tx,
webhook_tx,
};
// Init servers
let smtp = SMTP::init(&mut config, shared_core.clone(), ipc).await;
let jmap = JMAP::init(
&mut config,
delivery_rx,
@ -369,6 +379,7 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest {
),
};
});
// Create tables and test accounts
let lookup = DirectoryStore {
store: shared_core

View file

@ -71,6 +71,16 @@ pub async fn test(params: &mut JMAPTest) {
// Reset rate limiters
server.inner.concurrency_limiter.clear();
params.webhook.clear();
// Incorrect passwords should be rejected with a 401 error
assert!(matches!(
Client::new()
.credentials(Credentials::basic("jdoe@example.com", "abcde"))
.accept_invalid_certs(true)
.connect("https://127.0.0.1:8899")
.await,
Err(jmap_client::Error::Problem(err)) if err.status() == Some(401)));
// Wait until the beginning of the 5 seconds bucket
const LIMIT: u64 = 5;
@ -79,15 +89,6 @@ pub async fn test(params: &mut JMAPTest) {
let range_end = (range_start * LIMIT) + LIMIT;
tokio::time::sleep(Duration::from_secs(range_end - now)).await;
// Incorrect passwords should be rejected with a 401 error
assert!(matches!(
Client::new()
.credentials(Credentials::basic("jdoe@example.com", "abcde"))
.accept_invalid_certs(true)
.connect("https://127.0.0.1:8899")
.await,
Err(jmap_client::Error::Problem(err)) if err.status() == Some(401)));
// Invalid authentication requests should be rate limited
let mut n_401 = 0;
let mut n_429 = 0;
@ -280,4 +281,13 @@ pub async fn test(params: &mut JMAPTest) {
params.client.set_default_account_id(&account_id);
destroy_all_mailboxes(params).await;
assert_is_empty(server).await;
// Check webhook events
params.webhook.assert_contains(&[
"auth.failure",
"auth.success",
"auth.banned",
"\"login\": \"jdoe@example.com\"",
"\"accountType\": \"individual\"",
]);
}

View file

@ -104,6 +104,7 @@ pub async fn test(params: &mut JMAPTest) {
// Delivering to individuals
let mut lmtp = SmtpConnection::connect().await;
params.webhook.clear();
lmtp.ingest(
"bill@example.com",
@ -320,6 +321,17 @@ pub async fn test(params: &mut JMAPTest) {
destroy_all_mailboxes(params).await;
}
assert_is_empty(server).await;
// Check webhook events
params.webhook.assert_contains(&[
"message.accepted",
"message.appended",
"dsn",
"\"returnPath\": \"bill@example.com\"",
"\"sender\": \"bill@example.com\"",
"\"address\": \"john.doe@example.com\"",
"\"type\": \"success\"",
]);
}
pub struct SmtpConnection {

View file

@ -30,15 +30,12 @@ use base64::{
use common::{
config::server::{ServerProtocol, Servers},
manager::config::{ConfigManager, Patterns},
Core,
webhooks::manager::spawn_webhook_manager,
Core, Ipc, IPC_CHANNEL_BUFFER,
};
use hyper::{header::AUTHORIZATION, Method};
use imap::core::{ImapSessionManager, IMAP};
use jmap::{
api::JmapSessionManager,
services::{housekeeper::Event, IPC_CHANNEL_BUFFER},
JMAP,
};
use jmap::{api::JmapSessionManager, services::housekeeper::Event, JMAP};
use jmap_client::client::{Client, Credentials};
use jmap_proto::{error::request::RequestError, types::id::Id};
use managesieve::core::ManageSieveSessionManager;
@ -54,6 +51,7 @@ use store::{
};
use tokio::sync::{mpsc, watch};
use utils::config::Config;
use webhooks::{spawn_mock_webhook_endpoint, MockWebhookEndpoint};
use crate::{add_test_certs, directory::DirectoryStore, store::TempDir, AssertConfig};
@ -82,6 +80,7 @@ pub mod stress_test;
pub mod thread_get;
pub mod thread_merge;
pub mod vacation_response;
pub mod webhooks;
pub mod websocket;
const SERVER: &str = r#"
@ -287,6 +286,15 @@ refresh-token-renew = "2s"
expn = true
vrfy = true
[webhook."test"]
url = "http://127.0.0.1:8821/hook"
events = ["auth.success", "auth.failure", "auth.banned", "auth.error",
"message.accepted", "message.rejected", "message.appended",
"account.over-quota", "dsn", "double-bounce", "report.incoming.dmarc",
"report.incoming.tls", "report.incoming.arf", "report.outgoing"]
signature-key = "ovos-moles"
throttle = "100ms"
"#;
#[tokio::test(flavor = "multi_thread")]
@ -314,6 +322,7 @@ pub async fn jmap_tests() {
)
.await;
webhooks::test(&mut params).await;
email_query::test(&mut params, delete).await;
email_get::test(&mut params).await;
email_set::test(&mut params).await;
@ -379,6 +388,7 @@ pub struct JMAPTest {
client: Client,
directory: DirectoryStore,
temp_dir: TempDir,
webhook: Arc<MockWebhookEndpoint>,
shutdown_tx: watch::Sender<bool>,
}
@ -485,9 +495,18 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
// Parse acceptors
servers.parse_tcp_acceptors(&mut config, shared_core.clone());
// Init servers
// Spawn webhook manager
let webhook_tx = spawn_webhook_manager(shared_core.clone());
// Setup IPC channels
let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER);
let smtp = SMTP::init(&mut config, shared_core.clone(), delivery_tx).await;
let ipc = Ipc {
delivery_tx,
webhook_tx,
};
// Init servers
let smtp = SMTP::init(&mut config, shared_core.clone(), ipc).await;
let jmap = JMAP::init(
&mut config,
delivery_rx,
@ -569,6 +588,7 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
client,
directory,
shutdown_tx,
webhook: spawn_mock_webhook_endpoint(),
}
}

View file

@ -27,7 +27,10 @@ use crate::{
jmap::{assert_is_empty, mailbox::destroy_all_mailboxes},
store::deflate_test_resource,
};
use jmap::{email::ingest::IngestEmail, IngestError};
use jmap::{
email::ingest::{IngestEmail, IngestSource},
IngestError,
};
use jmap_client::{email, mailbox::Role};
use jmap_proto::types::{collection::Collection, id::Id};
use mail_parser::{mailbox::mbox::MessageIterator, MessageParser};
@ -264,7 +267,7 @@ async fn test_multi_thread(params: &mut JMAPTest) {
mailbox_ids: vec![mailbox_id],
keywords: vec![],
received_at: None,
skip_duplicates: true,
source: IngestSource::Smtp,
encrypt: false,
})
.await

183
tests/src/jmap/webhooks.rs Normal file
View file

@ -0,0 +1,183 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use base64::{engine::general_purpose::STANDARD, Engine};
use common::{
manager::webadmin::Resource,
webhooks::{WebhookEvent, WebhookEvents},
};
use hyper::{body, server::conn::http1, service::service_fn};
use hyper_util::rt::TokioIo;
use jmap::api::http::{fetch_body, ToHttpResponse};
use jmap_proto::error::request::RequestError;
use ring::hmac;
use store::parking_lot::Mutex;
use tokio::{net::TcpListener, sync::watch};
use super::JMAPTest;
pub struct MockWebhookEndpoint {
pub tx: watch::Sender<bool>,
pub events: Mutex<Vec<WebhookEvent>>,
pub reject: AtomicBool,
}
pub async fn test(params: &mut JMAPTest) {
println!("Running Webhook tests...");
// Webhooks endpoint starts disabled by default, make sure there are no events.
tokio::time::sleep(Duration::from_millis(200)).await;
params.webhook.assert_is_empty();
// Enable the endpoint
params.webhook.accept();
tokio::time::sleep(Duration::from_millis(1000)).await;
// Check for events
params.webhook.assert_contains(&["auth.success"]);
}
impl MockWebhookEndpoint {
pub fn assert_contains(&self, expected: &[&str]) {
let events =
serde_json::to_string_pretty(&self.events.lock().drain(..).collect::<Vec<_>>())
.unwrap();
for string in expected {
if !events.contains(string) {
panic!(
"Expected events to contain '{}', but it did not. Events: {}",
string, events
);
}
}
}
pub fn accept(&self) {
self.reject.store(false, Ordering::Relaxed);
}
pub fn reject(&self) {
self.reject.store(true, Ordering::Relaxed);
}
pub fn clear(&self) {
self.events.lock().clear();
}
pub fn assert_is_empty(&self) {
assert!(self.events.lock().is_empty());
}
}
pub fn spawn_mock_webhook_endpoint() -> Arc<MockWebhookEndpoint> {
let (tx, rx) = watch::channel(true);
let endpoint_ = Arc::new(MockWebhookEndpoint {
tx,
events: Mutex::new(vec![]),
reject: true.into(),
});
let endpoint = endpoint_.clone();
tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:8821")
.await
.unwrap_or_else(|e| {
panic!("Failed to bind mock Milter server to 127.0.0.1:8821: {e}");
});
let mut rx_ = rx.clone();
loop {
tokio::select! {
stream = listener.accept() => {
match stream {
Ok((stream, _)) => {
let _ = http1::Builder::new()
.keep_alive(false)
.serve_connection(
TokioIo::new(stream),
service_fn(|mut req: hyper::Request<body::Incoming>| {
let endpoint = endpoint.clone();
async move {
// Verify HMAC signature
let key = hmac::Key::new(hmac::HMAC_SHA256, "ovos-moles".as_bytes());
let body = fetch_body(&mut req, 1024 * 1024).await.unwrap();
let tag = STANDARD.decode(req.headers().get("X-Signature").unwrap().to_str().unwrap()).unwrap();
hmac::verify(&key, &body, &tag).expect("Invalid signature");
// Deserialize JSON
let request = serde_json::from_slice::<WebhookEvents>(&body)
.expect("Failed to parse JSON");
if !endpoint.reject.load(Ordering::Relaxed) {
//let c = print!("received webhook: {}", serde_json::to_string_pretty(&request).unwrap());
// Add events
endpoint.events.lock().extend(request.events);
Ok::<_, hyper::Error>(
Resource {
content_type: "application/json",
contents: "[]".to_string().into_bytes(),
}
.into_http_response(),
)
} else {
//let c = print!("rejected webhook: {}", serde_json::to_string_pretty(&request).unwrap());
Ok::<_, hyper::Error>(
RequestError::not_found().into_http_response()
)
}
}
}),
)
.await;
}
Err(err) => {
panic!("Something went wrong: {err}" );
}
}
},
_ = rx_.changed() => {
//println!("Mock jMilter server stopping");
break;
}
};
}
});
endpoint_
}