diff --git a/Cargo.lock b/Cargo.lock index cd96513d..ea04b644 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,7 +202,7 @@ dependencies = [ "base64ct", "blake2", "cpufeatures", - "password-hash 0.5.0", + "password-hash", ] [[package]] @@ -369,18 +369,17 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "attohttpc" -version = "0.22.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fcf00bc6d5abb29b5f97e3c61a90b6d3caa12f3faf897d4a3e3607c050a35a7" +checksum = "0f77d243921b0979fbbd728dd2d5162e68ac8252976797c24eb5b3a6af9090dc" dependencies = [ "http 0.2.12", "log", - "rustls 0.20.9", + "rustls 0.21.12", "serde", "serde_json", "url", - "webpki", - "webpki-roots 0.22.6", + "webpki-roots 0.25.4", ] [[package]] @@ -391,14 +390,14 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "aws-creds" -version = "0.34.1" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3776743bb68d4ad02ba30ba8f64373f1be4e082fe47651767171ce75bb2f6cf5" +checksum = "390ad3b77f3e21e01a4a0355865853b681daf1988510b0b15e31c0c4ae7eb0f6" dependencies = [ "attohttpc", - "dirs", + "home", "log", - "quick-xml 0.26.0", + "quick-xml 0.30.0", "rust-ini", "serde", "thiserror", @@ -633,7 +632,7 @@ dependencies = [ "arrayvec", "cc", "cfg-if", - "constant_time_eq 0.3.0", + "constant_time_eq", ] [[package]] @@ -757,9 +756,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.16.0" +version = "1.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5" +checksum = "b236fc92302c97ed75b38da1f4917b5cdda4984745740f153a5d3059e48d725e" [[package]] name = "byteorder" @@ -1024,9 +1023,9 @@ dependencies = [ "futures", "hostname 0.4.0", "hyper 1.3.1", - "idna 0.5.0", - "imagesize", - "infer", + "idna 1.0.1", + "imagesize 0.13.0", + "infer 0.16.0", "jmap_proto", "mail-auth", "mail-parser", @@ -1067,7 +1066,8 @@ dependencies = [ "utils", "whatlang", "x509-parser 0.16.0", - "zip 0.6.6", + "xxhash-rust", + "zip", ] [[package]] @@ -1089,18 +1089,32 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + [[package]] name = "const_panic" version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6051f239ecec86fde3410901ab7860d458d160371533842974fc61f96d15879b" -[[package]] -name = "constant_time_eq" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" - [[package]] name = "constant_time_eq" version = "0.3.0" @@ -1289,16 +1303,15 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.2" +version = "4.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", "digest 0.10.7", "fiat-crypto", - "platforms", "rustc_version 0.4.0", "subtle", "zeroize", @@ -1426,12 +1439,25 @@ dependencies = [ ] [[package]] -name = "deadpool-postgres" +name = "deadpool" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" dependencies = [ - "deadpool", + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-postgres" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab8a4ea925ce79678034870834602a2980f4b88c09e97feb266496dbb4493d2" +dependencies = [ + "async-trait", + "deadpool 0.12.1", + "getrandom", "tokio", "tokio-postgres", "tracing", @@ -1448,9 +1474,9 @@ dependencies = [ [[package]] name = "decancer" -version = "3.2.0" +version = "3.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a053d1eb02357b64ce1a85133b1919466fc164a87fae45c11330dd41d0483b1e" +checksum = "89dc48fa5c407ad29590bc5cc17a56684ed7b4f8215a9b65a31644ff051557f4" dependencies = [ "lazy_static", "paste", @@ -1591,7 +1617,7 @@ dependencies = [ "ahash 0.8.11", "argon2", "async-trait", - "deadpool", + "deadpool 0.10.0", "futures", "jmap_proto", "ldap3", @@ -1601,8 +1627,8 @@ dependencies = [ "mail-send", "md5", "parking_lot", - "password-hash 0.5.0", - "pbkdf2 0.12.2", + "password-hash", + "pbkdf2", "pwhash", "regex", "rustls 0.22.4", @@ -1619,15 +1645,6 @@ dependencies = [ "utils", ] -[[package]] -name = "dirs" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" -dependencies = [ - "dirs-sys", -] - [[package]] name = "dirs-next" version = "2.0.0" @@ -1638,17 +1655,6 @@ dependencies = [ "dirs-sys-next", ] -[[package]] -name = "dirs-sys" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" -dependencies = [ - "libc", - "redox_users", - "winapi", -] - [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -1673,9 +1679,12 @@ dependencies = [ [[package]] name = "dlv-list" -version = "0.3.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] [[package]] name = "dns-update" @@ -1950,6 +1959,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95765f67b4b18863968b4a1bd5bb576f732b29a4a28c7cd84c09fa3e2875f33c" +[[package]] +name = "fastrand" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" + [[package]] name = "ff" version = "0.13.0" @@ -2360,6 +2375,12 @@ dependencies = [ "ahash 0.7.8", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "hashbrown" version = "0.14.5" @@ -2699,6 +2720,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.29", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-util" version = "0.1.5" @@ -2897,9 +2931,9 @@ dependencies = [ [[package]] name = "idna" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed" +checksum = "44a986806a1cc899952ba462bc1f28afbfd5850ab6cb030ccb20dd02cc527a24" dependencies = [ "icu_normalizer", "icu_properties", @@ -2913,6 +2947,12 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "029d73f573d8e8d63e6d5020011d3255b28c3ba85d6cf870a07184ed23de9284" +[[package]] +name = "imagesize" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edcd27d72f2f071c64249075f42e205ff93c9a4c5f6c6da53e79ed9f9832c285" + [[package]] name = "imap" version = "0.8.1" @@ -2993,6 +3033,15 @@ dependencies = [ "cfb", ] +[[package]] +name = "infer" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc150e5ce2330295b8616ce0e3f53250e53af31759a9dbedad1621ba29151847" +dependencies = [ + "cfb", +] + [[package]] name = "inout" version = "0.1.3" @@ -3496,9 +3545,9 @@ dependencies = [ [[package]] name = "mail-auth" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282cd8ca9254ca13a30f3b6d940057de71ae568303497c7aa4fbab36be2f0aea" +checksum = "9bd9d657de66a3d5ac360c3eab8c9f5cac2565f2b97cc032d5de4c900ef470de" dependencies = [ "ahash 0.8.11", "flate2", @@ -3507,14 +3556,14 @@ dependencies = [ "mail-builder", "mail-parser", "parking_lot", - "quick-xml 0.31.0", + "quick-xml 0.32.0", "rand", "ring 0.17.8", "rsa", "rustls-pemfile 2.1.2", "serde", "serde_json", - "zip 2.1.3", + "zip", ] [[package]] @@ -3818,7 +3867,24 @@ dependencies = [ "thiserror", "time", "uuid", - "zstd 0.13.1", + "zstd", +] + +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", ] [[package]] @@ -4160,12 +4226,12 @@ dependencies = [ [[package]] name = "ordered-multimap" -version = "0.4.3" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a" +checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" dependencies = [ "dlv-list", - "hashbrown 0.12.3", + "hashbrown 0.13.2", ] [[package]] @@ -4209,17 +4275,6 @@ dependencies = [ "windows-targets 0.52.5", ] -[[package]] -name = "password-hash" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700" -dependencies = [ - "base64ct", - "rand_core", - "subtle", -] - [[package]] name = "password-hash" version = "0.5.0" @@ -4237,18 +4292,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" -[[package]] -name = "pbkdf2" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83a0692ec44e4cf1ef28ca317f14f8f07da2d95ec3fa01f86e4467b725e60917" -dependencies = [ - "digest 0.10.7", - "hmac 0.12.1", - "password-hash 0.4.2", - "sha2 0.10.8", -] - [[package]] name = "pbkdf2" version = "0.12.2" @@ -4257,7 +4300,7 @@ checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" dependencies = [ "digest 0.10.7", "hmac 0.12.1", - "password-hash 0.5.0", + "password-hash", "sha2 0.10.8", ] @@ -4416,12 +4459,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" -[[package]] -name = "platforms" -version = "3.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7" - [[package]] name = "polyval" version = "0.6.2" @@ -4666,9 +4703,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quick-xml" -version = "0.26.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd" +checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" dependencies = [ "memchr", "serde", @@ -4683,6 +4720,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "quick-xml" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2" +dependencies = [ + "memchr", +] + [[package]] name = "quinn" version = "0.11.2" @@ -5273,9 +5319,9 @@ dependencies = [ [[package]] name = "rust-ini" -version = "0.18.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df" +checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" dependencies = [ "cfg-if", "ordered-multimap", @@ -5283,32 +5329,36 @@ dependencies = [ [[package]] name = "rust-s3" -version = "0.33.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b2ac5ff6acfbe74226fa701b5ef793aaa054055c13ebb7060ad36942956e027" +checksum = "c6679da8efaf4c6f0c161de0961dfe95fb6e9049c398d6fbdada2639f053aedb" dependencies = [ "async-trait", "aws-creds", "aws-region", - "base64 0.13.1", + "base64 0.21.7", "bytes", "cfg-if", "futures", "hex", "hmac 0.12.1", "http 0.2.12", + "hyper 0.14.29", + "hyper-tls", "log", "maybe-async", "md5", + "native-tls", "percent-encoding", - "quick-xml 0.26.0", - "reqwest 0.11.27", + "quick-xml 0.30.0", "serde", "serde_derive", + "serde_json", "sha2 0.10.8", "thiserror", "time", "tokio", + "tokio-native-tls", "tokio-stream", "url", ] @@ -5391,18 +5441,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.20.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" -dependencies = [ - "log", - "ring 0.16.20", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.21.12" @@ -5589,8 +5627,8 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" dependencies = [ - "password-hash 0.5.0", - "pbkdf2 0.12.2", + "password-hash", + "pbkdf2", "salsa20", "sha2 0.10.8", ] @@ -5997,8 +6035,8 @@ dependencies = [ "hyper 1.3.1", "hyper-util", "idna 0.5.0", - "imagesize", - "infer", + "imagesize 0.12.0", + "infer 0.15.0", "lazy_static", "lru-cache", "mail-auth", @@ -6145,7 +6183,7 @@ dependencies = [ "bitpacking", "blake3", "bytes", - "deadpool", + "deadpool 0.12.1", "deadpool-postgres", "elasticsearch", "farmhash", @@ -6329,6 +6367,18 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tempfile" +version = "3.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +dependencies = [ + "cfg-if", + "fastrand", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "term" version = "0.7.0" @@ -6383,6 +6433,7 @@ dependencies = [ "pop3", "rayon", "reqwest 0.12.5", + "ring 0.17.8", "rustls 0.22.4", "rustls-pemfile 2.1.2", "rustls-pki-types", @@ -6545,6 +6596,16 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.10" @@ -6983,12 +7044,12 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.1" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c25da092f0a868cdf09e8674cd3b7ef3a7d92a24253e663a2fb85e2496de56" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", - "idna 1.0.0", + "idna 0.5.0", "percent-encoding", ] @@ -7232,15 +7293,6 @@ dependencies = [ "untrusted 0.9.0", ] -[[package]] -name = "webpki-roots" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" -dependencies = [ - "webpki", -] - [[package]] name = "webpki-roots" version = "0.25.4" @@ -7707,26 +7759,6 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "zip" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" -dependencies = [ - "aes", - "byteorder", - "bzip2", - "constant_time_eq 0.1.5", - "crc32fast", - "crossbeam-utils", - "flate2", - "hmac 0.12.1", - "pbkdf2 0.11.0", - "sha1", - "time", - "zstd 0.11.2+zstd.1.5.2", -] - [[package]] name = "zip" version = "2.1.3" @@ -7736,7 +7768,7 @@ dependencies = [ "aes", "arbitrary", "bzip2", - "constant_time_eq 0.3.0", + "constant_time_eq", "crc32fast", "crossbeam-utils", "deflate64", @@ -7746,14 +7778,14 @@ dependencies = [ "indexmap 2.2.6", "lzma-rs", "memchr", - "pbkdf2 0.12.2", + "pbkdf2", "rand", "sha1", "thiserror", "time", "zeroize", "zopfli", - "zstd 0.13.1", + "zstd", ] [[package]] @@ -7770,32 +7802,13 @@ dependencies = [ "simd-adler32", ] -[[package]] -name = "zstd" -version = "0.11.2+zstd.1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" -dependencies = [ - "zstd-safe 5.0.2+zstd.1.5.2", -] - [[package]] name = "zstd" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" dependencies = [ - "zstd-safe 7.1.0", -] - -[[package]] -name = "zstd-safe" -version = "5.0.2+zstd.1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" -dependencies = [ - "libc", - "zstd-sys", + "zstd-safe", ] [[package]] diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 8c309d46..6910318f 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -45,19 +45,20 @@ opentelemetry = { version = "0.22.0" } opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.15.0", features = ["http-proto", "reqwest-client"] } opentelemetry-semantic-conventions = { version = "0.14.0" } -imagesize = "0.12" +imagesize = "0.13" sha1 = "0.10" sha2 = "0.10.6" md5 = "0.7.0" whatlang = "0.16" -idna = "0.5" +idna = "1.0" decancer = "3.0.1" unicode-security = "0.1.0" -infer = "0.15.0" +infer = "0.16" bincode = "1.3.1" hostname = "0.4.0" -zip = "0.6.6" +zip = "2.1" pwhash = "1.0.0" +xxhash-rust = { version = "0.8.5", features = ["xxh3"] } [target.'cfg(unix)'.dependencies] privdrop = "0.5.3" diff --git a/crates/common/src/config/imap.rs b/crates/common/src/config/imap.rs index 723bb1b1..0e734ba3 100644 --- a/crates/common/src/config/imap.rs +++ b/crates/common/src/config/imap.rs @@ -1,3 +1,26 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + use std::time::Duration; use utils::config::{Config, Rate}; diff --git a/crates/common/src/config/mod.rs b/crates/common/src/config/mod.rs index e2540357..1f9d817f 100644 --- a/crates/common/src/config/mod.rs +++ b/crates/common/src/config/mod.rs @@ -1,3 +1,26 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + use std::sync::Arc; use arc_swap::ArcSwap; @@ -5,7 +28,10 @@ use directory::{Directories, Directory}; use store::{BlobBackend, BlobStore, FtsStore, LookupStore, Store, Stores}; use utils::config::Config; -use crate::{expr::*, listener::tls::TlsManager, manager::config::ConfigManager, Core, Network}; +use crate::{ + expr::*, listener::tls::TlsManager, manager::config::ConfigManager, webhooks::Webhooks, Core, + Network, +}; use self::{ imap::ImapConfig, jmap::settings::JmapConfig, scripts::Scripting, smtp::SmtpConfig, @@ -130,6 +156,7 @@ impl Core { jmap: JmapConfig::parse(config), imap: ImapConfig::parse(config), tls: TlsManager::parse(config), + web_hooks: Webhooks::parse(config), storage: Storage { data, blob, diff --git a/crates/common/src/config/network.rs b/crates/common/src/config/network.rs index 05fc2b35..f33c2ef6 100644 --- a/crates/common/src/config/network.rs +++ b/crates/common/src/config/network.rs @@ -1,10 +1,41 @@ -use utils::config::Config; +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{str::FromStr, time::Duration}; use crate::{ expr::{if_block::IfBlock, tokenizer::TokenMap}, listener::blocked::{AllowedIps, BlockedIps}, + webhooks::{Webhook, WebhookType, Webhooks}, Network, }; +use ahash::AHashSet; +use base64::{engine::general_purpose::STANDARD, Engine}; +use hyper::{ + header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, + HeaderMap, +}; +use utils::config::Config; use super::CONNECTION_VARS; @@ -40,3 +71,132 @@ impl Network { network } } + +impl Webhooks { + pub fn parse(config: &mut Config) -> Self { + let mut hooks = Webhooks { + events: Default::default(), + hooks: Default::default(), + }; + + for id in config + .sub_keys("webhook", ".url") + .map(|s| s.to_string()) + .collect::>() + { + if let Some(webhook) = parse_webhook(config, &id) { + hooks.events.extend(&webhook.events); + hooks.hooks.insert(webhook.id, webhook.into()); + } + } + + hooks + } +} + +fn parse_webhook(config: &mut Config, id: &str) -> Option { + let mut headers = HeaderMap::new(); + + for (header, value) in config + .values(("webhook", id, "headers")) + .map(|(_, v)| { + if let Some((k, v)) = v.split_once(':') { + Ok(( + HeaderName::from_str(k.trim()).map_err(|err| { + format!("Invalid header found in property \"webhook.{id}.headers\": {err}",) + })?, + HeaderValue::from_str(v.trim()).map_err(|err| { + format!("Invalid header found in property \"webhook.{id}.headers\": {err}",) + })?, + )) + } else { + Err(format!( + "Invalid header found in property \"webhook.{id}.headers\": {v}", + )) + } + }) + .collect::, String>>() + .map_err(|e| config.new_parse_error(("webhook", id, "headers"), e)) + .unwrap_or_default() + { + headers.insert(header, value); + } + + headers.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + if let (Some(name), Some(secret)) = ( + config.value(("webhook", id, "user")), + config.value(("webhook", id, "secret")), + ) { + headers.insert( + AUTHORIZATION, + format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret))) + .parse() + .unwrap(), + ); + } + + // Parse webhook events + let mut events = AHashSet::new(); + let mut parse_errors = Vec::new(); + for (_, value) in config.values(("webhook", id, "events")) { + match WebhookType::from_str(value) { + Ok(event) => { + events.insert(event); + } + Err(err) => { + parse_errors.push(err); + } + } + } + if !parse_errors.is_empty() { + config.new_parse_error( + ("webhook", id, "events"), + format!("Invalid webhook events: {}", parse_errors.join(", ")), + ); + } + + let url = config.value_require(("webhook", id, "url"))?.to_string(); + Some(Webhook { + id: xxhash_rust::xxh3::xxh3_64(url.as_bytes()), + url, + timeout: config + .property_or_default(("webhook", id, "timeout"), "30s") + .unwrap_or_else(|| Duration::from_secs(30)), + tls_allow_invalid_certs: config + .property_or_default(("webhook", id, "allow-invalid-certs"), "false") + .unwrap_or_default(), + headers, + key: config + .value(("webhook", id, "signature-key")) + .unwrap_or_default() + .to_string(), + throttle: config + .property_or_default(("webhook", id, "throttle"), "1s") + .unwrap_or_else(|| Duration::from_secs(1)), + events, + }) +} + +impl FromStr for WebhookType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "auth.success" => Ok(Self::AuthSuccess), + "auth.failure" => Ok(Self::AuthFailure), + "auth.banned" => Ok(Self::AuthBanned), + "auth.error" => Ok(Self::AuthError), + "message.accepted" => Ok(Self::MessageAccepted), + "message.rejected" => Ok(Self::MessageRejected), + "message.appended" => Ok(Self::MessageAppended), + "account.over-quota" => Ok(Self::AccountOverQuota), + "dsn" => Ok(Self::DSN), + "double-bounce" => Ok(Self::DoubleBounce), + "report.incoming.dmarc" => Ok(Self::IncomingDmarcReport), + "report.incoming.tls" => Ok(Self::IncomingTlsReport), + "report.incoming.arf" => Ok(Self::IncomingArfReport), + "report.outgoing" => Ok(Self::OutgoingReport), + _ => Err(s.to_string()), + } + } +} diff --git a/crates/common/src/config/scripts.rs b/crates/common/src/config/scripts.rs index ce15bed2..e416a9c8 100644 --- a/crates/common/src/config/scripts.rs +++ b/crates/common/src/config/scripts.rs @@ -1,3 +1,26 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + use std::{ collections::HashSet, sync::Arc, @@ -24,6 +47,9 @@ pub struct Scripting { pub return_path: IfBlock, pub sign: IfBlock, pub scripts: AHashMap>, +} + +pub struct ScriptCache { pub bayes_cache: BayesTokenCache, pub remote_lists: RwLock>, } @@ -307,29 +333,32 @@ impl Scripting { "'MAILER-DAEMON@' + key_get('default', 'domain')", ) }), - from_name: IfBlock::try_parse(config, "sieve.trusted.from-name", &token_map) + from_name: IfBlock::try_parse(config, "sieve.trusted.from-name", &token_map) .unwrap_or_else(|| { - IfBlock::new::<()>( - "sieve.trusted.from-name", - [], - "'Automated Message'", - ) + IfBlock::new::<()>("sieve.trusted.from-name", [], "'Automated Message'") }), - return_path: IfBlock::try_parse(config, "sieve.trusted.return-path", &token_map) - .unwrap_or_else(|| { - IfBlock::empty( - "sieve.trusted.return-path", - ) - }), - sign: IfBlock::try_parse(config, "sieve.trusted.sign", &token_map) - .unwrap_or_else(|| { + return_path: IfBlock::try_parse(config, "sieve.trusted.return-path", &token_map) + .unwrap_or_else(|| IfBlock::empty("sieve.trusted.return-path")), + sign: IfBlock::try_parse(config, "sieve.trusted.sign", &token_map).unwrap_or_else( + || { IfBlock::new::<()>( "sieve.trusted.sign", [], - "['rsa-' + key_get('default', 'domain'), 'ed25519-' + key_get('default', 'domain')]", + concat!( + "['rsa-' + key_get('default', 'domain'), ", + "'ed25519-' + key_get('default', 'domain')]" + ), ) - }), + }, + ), scripts, + } + } +} + +impl ScriptCache { + pub fn parse(config: &mut Config) -> Self { + ScriptCache { bayes_cache: BayesTokenCache::new( config .property_or_default("cache.bayes.capacity", "8192") @@ -362,9 +391,19 @@ impl Default for Scripting { sign: IfBlock::new::<()>( "sieve.trusted.sign", [], - "['rsa-' + key_get('default', 'domain'), 'ed25519-' + key_get('default', 'domain')]", + concat!( + "['rsa-' + key_get('default', 'domain'), ", + "'ed25519-' + key_get('default', 'domain')]" + ), ), scripts: AHashMap::new(), + } + } +} + +impl Default for ScriptCache { + fn default() -> Self { + Self { bayes_cache: BayesTokenCache::new( 8192, Duration::from_secs(3600), @@ -386,8 +425,6 @@ impl Clone for Scripting { return_path: self.return_path.clone(), sign: self.sign.clone(), scripts: self.scripts.clone(), - bayes_cache: self.bayes_cache.clone(), - remote_lists: RwLock::new(self.remote_lists.read().clone()), } } } diff --git a/crates/common/src/config/server/mod.rs b/crates/common/src/config/server/mod.rs index e8e4acf3..6b1b3232 100644 --- a/crates/common/src/config/server/mod.rs +++ b/crates/common/src/config/server/mod.rs @@ -1,6 +1,7 @@ use std::{fmt::Display, net::SocketAddr, time::Duration}; use ahash::AHashMap; +use serde::{Deserialize, Serialize}; use tokio::net::TcpSocket; use utils::config::ipmask::IpAddrMask; @@ -36,7 +37,7 @@ pub struct Listener { pub nodelay: bool, } -#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)] pub enum ServerProtocol { #[default] Smtp, diff --git a/crates/common/src/config/smtp/session.rs b/crates/common/src/config/smtp/session.rs index 8c22167e..6ea0b469 100644 --- a/crates/common/src/config/smtp/session.rs +++ b/crates/common/src/config/smtp/session.rs @@ -6,7 +6,10 @@ use std::{ use ahash::AHashSet; use base64::{engine::general_purpose::STANDARD, Engine}; -use hyper::{header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, HeaderMap}; +use hyper::{ + header::{HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, + HeaderMap, +}; use smtp_proto::*; use utils::config::{utils::ParseValue, Config}; @@ -203,14 +206,14 @@ impl SessionConfig { session.rcpt.catch_all = AddressMapping::parse(config, "session.rcpt.catch-all"); session.rcpt.subaddressing = AddressMapping::parse(config, "session.rcpt.sub-addressing"); session.milters = config - .sub_keys("session.milter", "") + .sub_keys("session.milter", ".hostname") .map(|s| s.to_string()) .collect::>() .into_iter() .filter_map(|id| parse_milter(config, &id, &has_rcpt_vars)) .collect(); session.jmilters = config - .sub_keys("session.jmilter", "") + .sub_keys("session.jmilter", ".url") .map(|s| s.to_string()) .collect::>() .into_iter() @@ -589,20 +592,17 @@ fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option< HeaderName::from_str(k.trim()).map_err(|err| { format!( "Invalid header found in property \"session.jmilter.{id}.headers\": {err}", - ) })?, HeaderValue::from_str(v.trim()).map_err(|err| { format!( "Invalid header found in property \"session.jmilter.{id}.headers\": {err}", - ) })?, )) } else { Err(format!( "Invalid header found in property \"session.jmilter.{id}.headers\": {v}", - )) } }) @@ -614,8 +614,16 @@ fn parse_jmilter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option< } headers.insert(CONTENT_TYPE, "application/json".parse().unwrap()); - if let (Some(name), Some(secret)) = (config.value(("session.jmilter", id, "user")), config.value(("session.jmilter", id, "secret"))) { - headers.insert(AUTHORIZATION, format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret))).parse().unwrap()); + if let (Some(name), Some(secret)) = ( + config.value(("session.jmilter", id, "user")), + config.value(("session.jmilter", id, "secret")), + ) { + headers.insert( + AUTHORIZATION, + format!("Basic {}", STANDARD.encode(format!("{}:{}", name, secret))) + .parse() + .unwrap(), + ); } Some(JMilter { diff --git a/crates/common/src/config/storage.rs b/crates/common/src/config/storage.rs index 30e8575b..f34248f3 100644 --- a/crates/common/src/config/storage.rs +++ b/crates/common/src/config/storage.rs @@ -1,3 +1,26 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + use std::sync::Arc; use ahash::AHashMap; diff --git a/crates/common/src/config/tracers.rs b/crates/common/src/config/tracers.rs index af24f076..f8ea7b05 100644 --- a/crates/common/src/config/tracers.rs +++ b/crates/common/src/config/tracers.rs @@ -1,3 +1,26 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + use std::{collections::HashMap, str::FromStr}; use opentelemetry_otlp::{HttpExporterBuilder, TonicExporterBuilder, WithExportConfig}; diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 46950e98..872c4b64 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -28,6 +28,7 @@ use config::{ imap::ImapConfig, jmap::settings::JmapConfig, scripts::Scripting, + server::ServerProtocol, smtp::{ auth::{ArcSealer, DkimSigner}, queue::RelayHost, @@ -36,7 +37,7 @@ use config::{ storage::Storage, tracers::{OtelTracer, Tracer, Tracers}, }; -use directory::{core::secret::verify_secret_hash, Directory, Principal, QueryBy}; +use directory::{core::secret::verify_secret_hash, Directory, Principal, QueryBy, Type}; use expr::if_block::IfBlock; use listener::{ blocked::{AllowedIps, BlockedIps}, @@ -51,12 +52,13 @@ use opentelemetry_sdk::{ use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; use sieve::Sieve; use store::LookupStore; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{ layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry, }; use utils::{config::Config, BlobHash}; +use webhooks::{manager::WebhookEvent, WebhookPayload, WebhookType, Webhooks}; pub mod addresses; pub mod config; @@ -64,10 +66,13 @@ pub mod expr; pub mod listener; pub mod manager; pub mod scripts; +pub mod webhooks; pub static USER_AGENT: &str = concat!("StalwartMail/", env!("CARGO_PKG_VERSION"),); pub static DAEMON_NAME: &str = concat!("Stalwart Mail Server v", env!("CARGO_PKG_VERSION"),); +pub const IPC_CHANNEL_BUFFER: usize = 1024; + pub type SharedCore = Arc>; #[derive(Clone, Default)] @@ -79,6 +84,7 @@ pub struct Core { pub smtp: SmtpConfig, pub jmap: JmapConfig, pub imap: ImapConfig, + pub web_hooks: Webhooks, } #[derive(Clone)] @@ -103,6 +109,11 @@ pub enum DeliveryEvent { Stop, } +pub struct Ipc { + pub delivery_tx: mpsc::Sender, + pub webhook_tx: mpsc::Sender, +} + #[derive(Debug)] pub struct IngestMessage { pub sender_address: String, @@ -230,8 +241,10 @@ impl Core { pub async fn authenticate( &self, directory: &Directory, + ipc: &Ipc, credentials: &Credentials, remote_ip: IpAddr, + protocol: ServerProtocol, return_member_of: bool, ) -> directory::Result>> { // First try to authenticate the user against the default directory @@ -239,7 +252,24 @@ impl Core { .query(QueryBy::Credentials(credentials), return_member_of) .await { - Ok(Some(principal)) => return Ok(AuthResult::Success(principal)), + Ok(Some(principal)) => { + // Send webhook event + if self.has_webhook_subscribers(WebhookType::AuthSuccess) { + ipc.send_webhook( + WebhookType::AuthSuccess, + WebhookPayload::Authentication { + login: credentials.login().to_string(), + protocol, + remote_ip, + typ: principal.typ.into(), + as_master: None, + }, + ) + .await; + } + + return Ok(AuthResult::Success(principal)); + } Ok(None) => Ok(()), Err(err) => Err(err), }; @@ -254,6 +284,20 @@ impl Core { if username == fallback_admin => { if verify_secret_hash(fallback_pass, secret).await { + // Send webhook event + if self.has_webhook_subscribers(WebhookType::AuthSuccess) { + ipc.send_webhook( + WebhookType::AuthSuccess, + WebhookPayload::Authentication { + login: username.to_string(), + protocol, + remote_ip, + typ: Type::Superuser.into(), + as_master: None, + }, + ) + .await; + } return Ok(AuthResult::Success(Principal::fallback_admin( fallback_pass, ))); @@ -270,8 +314,37 @@ impl Core { .query(QueryBy::Name(username), return_member_of) .await? { + // Send webhook event + if self.has_webhook_subscribers(WebhookType::AuthSuccess) { + ipc.send_webhook( + WebhookType::AuthSuccess, + WebhookPayload::Authentication { + login: username.to_string(), + protocol, + remote_ip, + typ: principal.typ.into(), + as_master: true.into(), + }, + ) + .await; + } AuthResult::Success(principal) } else { + // Send webhook event + if self.has_webhook_subscribers(WebhookType::AuthFailure) { + ipc.send_webhook( + WebhookType::AuthFailure, + WebhookPayload::Authentication { + login: username.to_string(), + protocol, + remote_ip, + typ: None, + as_master: true.into(), + }, + ) + .await; + } + AuthResult::Failure }, ); @@ -281,13 +354,20 @@ impl Core { } if let Err(err) = result { + // Send webhook event + if self.has_webhook_subscribers(WebhookType::AuthError) { + ipc.send_webhook( + WebhookType::AuthError, + WebhookPayload::Error { + message: err.to_string(), + }, + ) + .await; + } + Err(err) } else if self.has_fail2ban() { - let login = match credentials { - Credentials::Plain { username, .. } - | Credentials::XOauth2 { username, .. } - | Credentials::OAuthBearer { token: username } => username, - }; + let login = credentials.login(); if self.is_fail2banned(remote_ip, login.to_string()).await? { tracing::info!( context = "directory", @@ -297,11 +377,55 @@ impl Core { "IP address blocked after too many failed login attempts", ); + // Send webhook event + if self.has_webhook_subscribers(WebhookType::AuthBanned) { + ipc.send_webhook( + WebhookType::AuthBanned, + WebhookPayload::Authentication { + login: credentials.login().to_string(), + protocol, + remote_ip, + typ: None, + as_master: None, + }, + ) + .await; + } + Ok(AuthResult::Banned) } else { + // Send webhook event + if self.has_webhook_subscribers(WebhookType::AuthFailure) { + ipc.send_webhook( + WebhookType::AuthFailure, + WebhookPayload::Authentication { + login: credentials.login().to_string(), + protocol, + remote_ip, + typ: None, + as_master: None, + }, + ) + .await; + } + Ok(AuthResult::Failure) } } else { + // Send webhook event + if self.has_webhook_subscribers(WebhookType::AuthFailure) { + ipc.send_webhook( + WebhookType::AuthFailure, + WebhookPayload::Authentication { + login: credentials.login().to_string(), + protocol, + remote_ip, + typ: None, + as_master: None, + }, + ) + .await; + } Ok(AuthResult::Failure) } } @@ -421,3 +545,17 @@ impl Tracers { } } } + +trait CredentialsUsername { + fn login(&self) -> &str; +} + +impl CredentialsUsername for Credentials { + fn login(&self) -> &str { + match self { + Credentials::Plain { username, .. } + | Credentials::XOauth2 { username, .. } + | Credentials::OAuthBearer { token: username } => username, + } + } +} diff --git a/crates/common/src/listener/blocked.rs b/crates/common/src/listener/blocked.rs index 55975a98..74720067 100644 --- a/crates/common/src/listener/blocked.rs +++ b/crates/common/src/listener/blocked.rs @@ -21,11 +21,7 @@ * for more details. */ -use std::{ - fmt::Debug, - net::{IpAddr, Ipv4Addr, Ipv6Addr}, - sync::atomic::AtomicU8, -}; +use std::{fmt::Debug, net::IpAddr, sync::atomic::AtomicU8}; use ahash::AHashSet; use parking_lot::RwLock; @@ -113,9 +109,12 @@ impl AllowedIps { } } - // Add loopback addresses - ip_addresses.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); - ip_addresses.insert(IpAddr::V6(Ipv6Addr::LOCALHOST)); + #[cfg(not(feature = "test_mode"))] + { + // Add loopback addresses + ip_addresses.insert(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST)); + ip_addresses.insert(IpAddr::V6(std::net::IIpv6Addr::LOCALHOST)); + } AllowedIps { ip_addresses, @@ -210,14 +209,18 @@ impl Default for BlockedIps { } } +#[allow(clippy::derivable_impls)] impl Default for AllowedIps { fn default() -> Self { // Add IPv4 and IPv6 loopback addresses Self { + #[cfg(not(feature = "test_mode"))] ip_addresses: AHashSet::from_iter([ - IpAddr::V4(Ipv4Addr::LOCALHOST), - IpAddr::V6(Ipv6Addr::LOCALHOST), + IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), + IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), ]), + #[cfg(feature = "test_mode")] + ip_addresses: Default::default(), ip_networks: Default::default(), has_networks: Default::default(), } diff --git a/crates/common/src/manager/reload.rs b/crates/common/src/manager/reload.rs index 009751a6..5342766a 100644 --- a/crates/common/src/manager/reload.rs +++ b/crates/common/src/manager/reload.rs @@ -23,7 +23,6 @@ use ahash::AHashSet; use arc_swap::ArcSwap; -use parking_lot::RwLock; use store::Stores; use utils::config::{ipmask::IpAddrOrMask, utils::ParseValue, Config}; @@ -133,9 +132,6 @@ impl Core { if !config.errors.is_empty() { return Ok(config.into()); } - // Transfer Sieve cache - core.sieve.bayes_cache = self.sieve.bayes_cache.clone(); - core.sieve.remote_lists = RwLock::new(self.sieve.remote_lists.read().clone()); // Copy ACME certificates let mut certificates = core.tls.certificates.load().as_ref().clone(); diff --git a/crates/common/src/scripts/functions/image.rs b/crates/common/src/scripts/functions/image.rs index 034ca259..e0192ae5 100644 --- a/crates/common/src/scripts/functions/image.rs +++ b/crates/common/src/scripts/functions/image.rs @@ -33,14 +33,13 @@ pub fn fn_img_metadata<'x>(ctx: &'x Context<'x>, v: Vec) -> Variable { "type" => imagesize::image_type(bytes).ok().map(|t| { Variable::from(match t { imagesize::ImageType::Aseprite => "aseprite", - imagesize::ImageType::Avif => "avif", imagesize::ImageType::Bmp => "bmp", imagesize::ImageType::Dds => "dds", imagesize::ImageType::Exr => "exr", imagesize::ImageType::Farbfeld => "farbfeld", imagesize::ImageType::Gif => "gif", imagesize::ImageType::Hdr => "hdr", - imagesize::ImageType::Heif => "heif", + imagesize::ImageType::Heif(_) => "heif", imagesize::ImageType::Ico => "ico", imagesize::ImageType::Jpeg => "jpeg", imagesize::ImageType::Jxl => "jxl", @@ -53,6 +52,8 @@ pub fn fn_img_metadata<'x>(ctx: &'x Context<'x>, v: Vec) -> Variable { imagesize::ImageType::Tiff => "tiff", imagesize::ImageType::Vtf => "vtf", imagesize::ImageType::Webp => "webp", + imagesize::ImageType::Ilbm => "ilbm", + _ => "unknown", }) }), "width" => imagesize::blob_size(bytes) diff --git a/crates/common/src/scripts/plugins/bayes.rs b/crates/common/src/scripts/plugins/bayes.rs index b6b9b8e7..4f5a3ff5 100644 --- a/crates/common/src/scripts/plugins/bayes.rs +++ b/crates/common/src/scripts/plugins/bayes.rs @@ -116,7 +116,7 @@ async fn train(ctx: PluginContext<'_>, is_train: bool) -> Variable { ); // Update weight and invalidate cache - let bayes_cache = &ctx.core.sieve.bayes_cache; + let bayes_cache = &ctx.cache.bayes_cache; if is_train { for (hash, weights) in model.weights { if store @@ -209,7 +209,7 @@ pub async fn exec_classify(ctx: PluginContext<'_>) -> Variable { } // Obtain training counts - let bayes_cache = &ctx.core.sieve.bayes_cache; + let bayes_cache = &ctx.cache.bayes_cache; let (spam_learns, ham_learns) = if let Some(weights) = bayes_cache.get_or_update(TokenHash::default(), store).await { (weights.spam, weights.ham) @@ -286,7 +286,7 @@ pub async fn exec_is_balanced(ctx: PluginContext<'_>) -> Variable { let learn_spam = ctx.arguments[1].to_bool(); // Obtain training counts - let bayes_cache = &ctx.core.sieve.bayes_cache; + let bayes_cache = &ctx.cache.bayes_cache; let (spam_learns, ham_learns) = if let Some(weights) = bayes_cache.get_or_update(TokenHash::default(), store).await { (weights.spam as f64, weights.ham as f64) diff --git a/crates/common/src/scripts/plugins/lookup.rs b/crates/common/src/scripts/plugins/lookup.rs index 7f098eb0..9523baa2 100644 --- a/crates/common/src/scripts/plugins/lookup.rs +++ b/crates/common/src/scripts/plugins/lookup.rs @@ -180,7 +180,7 @@ pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable { const MAX_ENTRY_SIZE: usize = 256; const MAX_ENTRIES: usize = 100000; - match ctx.core.sieve.remote_lists.read().get(resource.as_ref()) { + match ctx.cache.remote_lists.read().get(resource.as_ref()) { Some(remote_list) if remote_list.expires < Instant::now() => { return remote_list.entries.contains(item.as_ref()).into() } @@ -245,7 +245,7 @@ pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable { }; // Lock remote list for writing - let mut _lock = ctx.core.sieve.remote_lists.write(); + let mut _lock = ctx.cache.remote_lists.write(); let list = _lock .entry(resource.to_string()) .or_insert_with(|| RemoteList { @@ -369,7 +369,7 @@ pub async fn exec_remote(ctx: PluginContext<'_>) -> Variable { } // Something went wrong, try again in one hour - let mut _lock = ctx.core.sieve.remote_lists.write(); + let mut _lock = ctx.cache.remote_lists.write(); let list = _lock .entry(resource.to_string()) .or_insert_with(|| RemoteList { diff --git a/crates/common/src/scripts/plugins/mod.rs b/crates/common/src/scripts/plugins/mod.rs index aab661a9..1f7af511 100644 --- a/crates/common/src/scripts/plugins/mod.rs +++ b/crates/common/src/scripts/plugins/mod.rs @@ -34,7 +34,7 @@ pub mod text; use mail_parser::Message; use sieve::{runtime::Variable, FunctionMap, Input}; -use crate::Core; +use crate::{config::scripts::ScriptCache, Core}; use super::ScriptModification; @@ -43,6 +43,7 @@ type RegisterPluginFnc = fn(u32, &mut FunctionMap) -> (); pub struct PluginContext<'x> { pub span: &'x tracing::Span, pub core: &'x Core, + pub cache: &'x ScriptCache, pub message: &'x Message<'x>, pub modifications: &'x mut Vec, pub arguments: Vec, diff --git a/crates/common/src/webhooks/collector.rs b/crates/common/src/webhooks/collector.rs new file mode 100644 index 00000000..916db121 --- /dev/null +++ b/crates/common/src/webhooks/collector.rs @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::sync::Arc; + +use crate::{Core, Ipc}; + +use super::{manager::WebhookEvent, WebhookPayload, WebhookType}; + +impl Core { + #[inline(always)] + pub fn has_webhook_subscribers(&self, event_type: WebhookType) -> bool { + self.web_hooks.events.contains(&event_type) + } +} + +impl Ipc { + pub async fn send_webhook(&self, event_type: WebhookType, payload: WebhookPayload) { + if let Err(err) = self + .webhook_tx + .send(WebhookEvent::Send { + typ: event_type, + payload: Arc::new(payload), + }) + .await + { + tracing::warn!("Failed to send webhook event: {:?}", err); + } + } +} diff --git a/crates/common/src/webhooks/manager.rs b/crates/common/src/webhooks/manager.rs new file mode 100644 index 00000000..063c79fc --- /dev/null +++ b/crates/common/src/webhooks/manager.rs @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use crate::{SharedCore, IPC_CHANNEL_BUFFER}; +use ahash::AHashMap; +use base64::{engine::general_purpose::STANDARD, Engine}; +use chrono::Utc; +use ring::hmac; +use tokio::sync::mpsc; +use utils::snowflake::SnowflakeIdGenerator; + +use super::{Webhook, WebhookEvents, WebhookPayload, WebhookType}; + +pub enum WebhookEvent { + Send { + typ: WebhookType, + payload: Arc, + }, + Success { + webhook_id: u64, + }, + Retry { + webhook_id: u64, + events: WebhookEvents, + }, + Stop, +} + +pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365); + +struct PendingEvents { + next_delivery: Instant, + pending_events: WebhookEvents, + retry_num: u32, + in_flight: bool, +} + +pub fn spawn_webhook_manager(core: SharedCore) -> mpsc::Sender { + let (webhook_tx, mut webhook_rx) = mpsc::channel(IPC_CHANNEL_BUFFER); + + let webhook_tx_ = webhook_tx.clone(); + + tokio::spawn(async move { + let mut wakeup_time = LONG_SLUMBER; + let mut pending_events: AHashMap = AHashMap::new(); + let id_generator = SnowflakeIdGenerator::new(); + + loop { + // Wait for the next event or timeout + let event_or_timeout = tokio::time::timeout(wakeup_time, webhook_rx.recv()).await; + + // Load settings + let core = core.load(); + + match event_or_timeout { + Ok(Some(event)) => match event { + WebhookEvent::Send { typ, payload } => { + for (webhook_id, webhook) in &core.web_hooks.hooks { + if webhook.events.contains(&typ) { + pending_events.entry(*webhook_id).or_default().push( + super::WebhookEvent { + id: id_generator.generate().unwrap_or_default(), + created_at: Utc::now(), + typ, + data: payload.clone(), + }, + ); + } + } + } + WebhookEvent::Success { webhook_id } => { + if let Some(pending_events) = pending_events.get_mut(&webhook_id) { + pending_events.success(); + } + } + WebhookEvent::Retry { webhook_id, events } => { + pending_events.entry(webhook_id).or_default().retry(events); + } + WebhookEvent::Stop => break, + }, + Ok(None) => { + break; + } + Err(_) => (), + } + + // Process events + let mut delete_ids = Vec::new(); + let mut next_retry = None; + for (webhook_id, events) in &mut pending_events { + if let Some(webhook) = core.web_hooks.hooks.get(webhook_id) { + if events.next_delivery <= Instant::now() { + if !events.is_empty() { + events.next_delivery = Instant::now() + webhook.throttle; + if !events.in_flight { + events.in_flight = true; + spawn_webhook_handler( + webhook.clone(), + events.take_events(), + webhook_tx.clone(), + ); + } + } else { + // No more events for webhook + delete_ids.push(*webhook_id); + } + } else if !events.is_empty() { + // Retry later + let this_retry = events.next_delivery - Instant::now(); + match next_retry { + Some(next_retry) if this_retry >= next_retry => {} + _ => { + next_retry = Some(this_retry); + } + } + } + } else { + delete_ids.push(*webhook_id); + } + } + wakeup_time = next_retry.unwrap_or(LONG_SLUMBER); + + // Delete removed or empty webhooks + for webhook_id in delete_ids { + pending_events.remove(&webhook_id); + } + } + }); + + webhook_tx_ +} + +fn spawn_webhook_handler( + webhook: Arc, + events: WebhookEvents, + webhook_tx: mpsc::Sender, +) { + tokio::spawn(async move { + let response = match post_webhook_events(&webhook, &events).await { + Ok(_) => WebhookEvent::Success { + webhook_id: webhook.id, + }, + Err(err) => { + tracing::warn!("Failed to post webhook events: {}", err); + WebhookEvent::Retry { + webhook_id: webhook.id, + events, + } + } + }; + + // Notify manager + if let Err(err) = webhook_tx.send(response).await { + tracing::error!("Failed to send webhook event: {}", err); + } + }); +} + +async fn post_webhook_events(webhook: &Webhook, events: &WebhookEvents) -> Result<(), String> { + // Serialize body + let body = serde_json::to_string(events) + .map_err(|err| format!("Failed to serialize events: {}", err))?; + + // Add HMAC-SHA256 signature + let mut headers = webhook.headers.clone(); + if !webhook.key.is_empty() { + let key = hmac::Key::new(hmac::HMAC_SHA256, webhook.key.as_bytes()); + let tag = hmac::sign(&key, body.as_bytes()); + + headers.insert( + "X-Signature", + STANDARD.encode(tag.as_ref()).parse().unwrap(), + ); + } + + // Send request + let response = reqwest::Client::builder() + .timeout(webhook.timeout) + .danger_accept_invalid_certs(webhook.tls_allow_invalid_certs) + .build() + .map_err(|err| format!("Failed to create HTTP client: {}", err))? + .post(&webhook.url) + .headers(headers) + .body(body) + .send() + .await + .map_err(|err| format!("Webhook request to {} failed: {err}", webhook.url))?; + + if response.status().is_success() { + Ok(()) + } else { + Err(format!( + "Webhook request to {} failed with code {}: {}", + webhook.url, + response.status().as_u16(), + response.status().canonical_reason().unwrap_or("Unknown") + )) + } +} + +impl Default for PendingEvents { + fn default() -> Self { + Self { + next_delivery: Instant::now(), + pending_events: WebhookEvents::default(), + retry_num: 0, + in_flight: false, + } + } +} + +impl PendingEvents { + pub fn push(&mut self, event: super::WebhookEvent) { + self.pending_events.events.push(event); + } + + pub fn success(&mut self) { + self.in_flight = false; + self.retry_num = 0; + } + + pub fn retry(&mut self, events: WebhookEvents) { + // Backoff + self.next_delivery = Instant::now() + Duration::from_secs(2u64.pow(self.retry_num)); + self.retry_num += 1; + self.in_flight = false; + + for event in events.events { + // Drop failed events older than 5 minutes + if event.created_at + Duration::from_secs(5 * 60) >= Utc::now() { + self.pending_events.events.push(event); + } + } + } + + pub fn is_empty(&self) -> bool { + self.pending_events.events.is_empty() + } + + pub fn take_events(&mut self) -> WebhookEvents { + std::mem::take(&mut self.pending_events) + } +} diff --git a/crates/common/src/webhooks/mod.rs b/crates/common/src/webhooks/mod.rs new file mode 100644 index 00000000..e3fce1ad --- /dev/null +++ b/crates/common/src/webhooks/mod.rs @@ -0,0 +1,351 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{net::IpAddr, sync::Arc, time::Duration}; + +use ahash::{AHashMap, AHashSet}; +use chrono::{DateTime, Utc}; +use hyper::HeaderMap; +use mail_auth::report::{ + tlsrpt::{PolicyType, ResultType}, + AuthFailureType, DeliveryResult, FeedbackType, IdentityAlignment, +}; +use serde::{Deserialize, Serialize}; + +use crate::config::server::ServerProtocol; + +pub mod collector; +pub mod manager; + +#[derive(Clone, Default)] +pub struct Webhooks { + pub events: AHashSet, + pub hooks: AHashMap>, +} + +#[derive(Clone)] +pub struct Webhook { + pub id: u64, + pub url: String, + pub key: String, + pub timeout: Duration, + pub throttle: Duration, + pub tls_allow_invalid_certs: bool, + pub headers: HeaderMap, + pub events: AHashSet, +} + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct WebhookEvents { + pub events: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WebhookEvent { + pub id: u64, + #[serde(rename = "createdAt")] + pub created_at: DateTime, + #[serde(rename = "type")] + pub typ: WebhookType, + pub data: Arc, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)] +pub enum WebhookType { + #[serde(rename = "auth.success")] + AuthSuccess, + #[serde(rename = "auth.failure")] + AuthFailure, + #[serde(rename = "auth.banned")] + AuthBanned, + #[serde(rename = "auth.error")] + AuthError, + #[serde(rename = "message.accepted")] + MessageAccepted, + #[serde(rename = "message.rejected")] + MessageRejected, + #[serde(rename = "message.appended")] + MessageAppended, + #[serde(rename = "account.over-quota")] + AccountOverQuota, + #[serde(rename = "dsn")] + DSN, + #[serde(rename = "double-bounce")] + DoubleBounce, + #[serde(rename = "report.incoming.dmarc")] + IncomingDmarcReport, + #[serde(rename = "report.incoming.tls")] + IncomingTlsReport, + #[serde(rename = "report.incoming.arf")] + IncomingArfReport, + #[serde(rename = "report.outgoing")] + OutgoingReport, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +pub enum WebhookPayload { + Authentication { + login: String, + protocol: ServerProtocol, + #[serde(rename = "remoteIp")] + remote_ip: IpAddr, + #[serde(rename = "accountType")] + #[serde(skip_serializing_if = "Option::is_none")] + typ: Option, + #[serde(rename = "isMasterLogin")] + #[serde(skip_serializing_if = "Option::is_none")] + as_master: Option, + }, + Error { + message: String, + }, + MessageAccepted { + #[serde(rename = "queueId")] + id: u64, + #[serde(rename = "remoteIp")] + #[serde(skip_serializing_if = "Option::is_none")] + remote_ip: Option, + #[serde(rename = "localPort")] + #[serde(skip_serializing_if = "Option::is_none")] + local_port: Option, + #[serde(rename = "authenticatedAs")] + #[serde(skip_serializing_if = "Option::is_none")] + authenticated_as: Option, + #[serde(rename = "returnPath")] + return_path: String, + recipients: Vec, + #[serde(rename = "nextRetry")] + next_retry: String, + #[serde(rename = "nextDSN")] + next_dsn: String, + expires: String, + size: usize, + }, + MessageRejected { + reason: WebhookMessageFailure, + #[serde(rename = "remoteIp")] + remote_ip: IpAddr, + #[serde(rename = "localPort")] + local_port: u16, + #[serde(rename = "authenticatedAs")] + #[serde(skip_serializing_if = "Option::is_none")] + authenticated_as: Option, + #[serde(rename = "returnPath")] + #[serde(skip_serializing_if = "Option::is_none")] + return_path: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + recipients: Vec, + }, + MessageAppended { + #[serde(rename = "accountId")] + account_id: u32, + #[serde(rename = "mailboxIds")] + mailbox_ids: Vec, + source: WebhookIngestSource, + encrypt: bool, + size: usize, + }, + DSN { + #[serde(rename = "queueId")] + id: u64, + sender: String, + status: Vec, + #[serde(rename = "createdAt")] + created: String, + }, + IncomingDmarcReport { + #[serde(rename = "rangeFrom")] + range_from: String, + #[serde(rename = "rangeTo")] + range_to: String, + domain: String, + #[serde(rename = "reportEmail")] + report_email: String, + #[serde(rename = "reportId")] + report_id: String, + #[serde(rename = "dmarcPass")] + dmarc_pass: u32, + #[serde(rename = "dmarcQuarantine")] + dmarc_quarantine: u32, + #[serde(rename = "dmarcReject")] + dmarc_reject: u32, + #[serde(rename = "dmarcNone")] + dmarc_none: u32, + #[serde(rename = "dkimPass")] + dkim_pass: u32, + #[serde(rename = "dkimFail")] + dkim_fail: u32, + #[serde(rename = "dkimNone")] + dkim_none: u32, + #[serde(rename = "spfPass")] + spf_pass: u32, + #[serde(rename = "spfFail")] + spf_fail: u32, + #[serde(rename = "spfNone")] + spf_none: u32, + }, + IncomingTlsReport { + policies: Vec, + }, + IncomingArfReport { + #[serde(rename = "feedbackType")] + feedback_type: FeedbackType, + #[serde(rename = "arrivalDate")] + #[serde(skip_serializing_if = "Option::is_none")] + arrival_date: Option, + #[serde(rename = "authenticationResults")] + #[serde(skip_serializing_if = "Vec::is_empty")] + authentication_results: Vec, + incidents: u32, + #[serde(rename = "reportedDomains")] + #[serde(skip_serializing_if = "Vec::is_empty")] + reported_domain: Vec, + #[serde(rename = "reportedUris")] + #[serde(skip_serializing_if = "Vec::is_empty")] + reported_uri: Vec, + #[serde(rename = "reportingMTA")] + #[serde(skip_serializing_if = "Option::is_none")] + reporting_mta: Option, + #[serde(rename = "sourceIp")] + #[serde(skip_serializing_if = "Option::is_none")] + source_ip: Option, + #[serde(rename = "userAgent")] + #[serde(skip_serializing_if = "Option::is_none")] + user_agent: Option, + #[serde(rename = "authFailureType")] + #[serde(skip_serializing_if = "has_no_auth_failure")] + auth_failure: AuthFailureType, + #[serde(rename = "deliveryResult")] + #[serde(skip_serializing_if = "has_no_delivery_result")] + delivery_result: DeliveryResult, + #[serde(rename = "dkimDomain")] + #[serde(skip_serializing_if = "Option::is_none")] + dkim_domain: Option, + #[serde(rename = "dkimIdentity")] + #[serde(skip_serializing_if = "Option::is_none")] + dkim_identity: Option, + #[serde(rename = "dkimSelector")] + #[serde(skip_serializing_if = "Option::is_none")] + dkim_selector: Option, + #[serde(rename = "identityAlignment")] + #[serde(skip_serializing_if = "has_no_alignment")] + identity_alignment: IdentityAlignment, + }, + AccountOverQuota { + #[serde(rename = "accountId")] + account_id: u32, + #[serde(rename = "quotaLimit")] + quota_limit: usize, + #[serde(rename = "quotaUsed")] + quota_used: usize, + #[serde(rename = "objectSize")] + object_size: usize, + }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WebhookTlsPolicy { + #[serde(rename = "rangeFrom")] + pub range_from: String, + #[serde(rename = "rangeTo")] + pub range_to: String, + pub domain: String, + #[serde(rename = "reportContact")] + #[serde(skip_serializing_if = "Option::is_none")] + pub report_contact: Option, + #[serde(rename = "reportId")] + pub report_id: String, + #[serde(rename = "policyType")] + pub policy_type: PolicyType, + #[serde(rename = "totalSuccesses")] + pub total_successes: u32, + #[serde(rename = "totalFailures")] + pub total_failures: u32, + pub details: AHashMap, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WebhookDSN { + pub address: String, + #[serde(rename = "remoteHost")] + #[serde(skip_serializing_if = "Option::is_none")] + pub remote_host: Option, + #[serde(rename = "type")] + pub typ: WebhookDSNType, + pub message: String, + #[serde(rename = "nextRetry")] + #[serde(skip_serializing_if = "Option::is_none")] + pub next_retry: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub expires: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "retryCount")] + pub retry_count: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum WebhookDSNType { + Success, + TemporaryFailure, + PermanentFailure, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[serde(rename_all = "camelCase")] +pub enum WebhookMessageFailure { + ParseFailed, + LoopDetected, + DkimPolicy, + ArcPolicy, + DmarcPolicy, + MilterReject, + SieveDiscard, + SieveReject, + QuotaExceeded, + ServerFailure, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[serde(rename_all = "lowercase")] +pub enum WebhookIngestSource { + Smtp, + Jmap, + Imap, +} + +fn has_no_alignment(alignment: &IdentityAlignment) -> bool { + matches!( + alignment, + IdentityAlignment::None | IdentityAlignment::Unspecified + ) +} + +fn has_no_delivery_result(result: &DeliveryResult) -> bool { + matches!(result, DeliveryResult::Unspecified) +} + +fn has_no_auth_failure(failure: &AuthFailureType) -> bool { + matches!(failure, AuthFailureType::Unspecified) +} diff --git a/crates/directory/src/backend/ldap/lookup.rs b/crates/directory/src/backend/ldap/lookup.rs index ad0bdf0a..957b418c 100644 --- a/crates/directory/src/backend/ldap/lookup.rs +++ b/crates/directory/src/backend/ldap/lookup.rs @@ -368,7 +368,9 @@ impl LdapMappings { "posixaccount" | "individual" | "person" | "inetorgperson" => { principal.typ = Type::Individual } - "posixgroup" | "groupofuniquenames" | "group" => principal.typ = Type::Group, + "posixgroup" | "groupofuniquenames" | "group" => { + principal.typ = Type::Group + } _ => continue, } break; diff --git a/crates/directory/src/lib.rs b/crates/directory/src/lib.rs index abcda5be..b59e1472 100644 --- a/crates/directory/src/lib.rs +++ b/crates/directory/src/lib.rs @@ -22,7 +22,10 @@ */ use core::cache::CachedDirectory; -use std::{fmt::Debug, sync::Arc}; +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; use ahash::AHashMap; use backend::{ @@ -311,3 +314,18 @@ impl PartialEq for DirectoryError { } } } + +impl Display for DirectoryError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Ldap(error) => write!(f, "LDAP error: {}", error), + Self::Store(error) => write!(f, "Store error: {}", error), + Self::Imap(error) => write!(f, "IMAP error: {}", error), + Self::Smtp(error) => write!(f, "SMTP error: {}", error), + Self::Pool(error) => write!(f, "Pool error: {}", error), + Self::Management(error) => write!(f, "Management error: {:?}", error), + Self::TimedOut => write!(f, "Directory timed out"), + Self::Unsupported => write!(f, "Method not supported by directory"), + } + } +} diff --git a/crates/imap/src/op/append.rs b/crates/imap/src/op/append.rs index 91811fd9..ccfaee1e 100644 --- a/crates/imap/src/op/append.rs +++ b/crates/imap/src/op/append.rs @@ -31,7 +31,7 @@ use imap_proto::{ use crate::core::{ImapUidToId, MailboxId, SelectedMailbox, Session, SessionData}; use common::listener::SessionStream; -use jmap::email::ingest::IngestEmail; +use jmap::email::ingest::{IngestEmail, IngestSource}; use jmap_proto::types::{acl::Acl, keyword::Keyword, state::StateChange, type_state::DataType}; use mail_parser::MessageParser; @@ -131,7 +131,7 @@ impl SessionData { mailbox_ids: vec![mailbox_id], keywords: message.flags.into_iter().map(Keyword::from).collect(), received_at: message.received_at.map(|d| d as u64), - skip_duplicates: false, + source: IngestSource::Imap, encrypt: self.jmap.core.jmap.encrypt && self.jmap.core.jmap.encrypt_append, }) .await diff --git a/crates/imap/src/op/authenticate.rs b/crates/imap/src/op/authenticate.rs index ea7095f5..a300dac2 100644 --- a/crates/imap/src/op/authenticate.rs +++ b/crates/imap/src/op/authenticate.rs @@ -21,7 +21,7 @@ * for more details. */ -use common::{listener::SessionStream, AuthResult}; +use common::{config::server::ServerProtocol, listener::SessionStream, AuthResult}; use imap_proto::{ protocol::{authenticate::Mechanism, capability::Capability}, receiver::{self, Request}, @@ -122,7 +122,7 @@ impl Session { Credentials::Plain { username, secret } | Credentials::XOauth2 { username, secret } => { match self .jmap - .authenticate_plain(&username, &secret, self.remote_addr) + .authenticate_plain(&username, &secret, self.remote_addr, ServerProtocol::Imap) .await { AuthResult::Success(token) => Some(token), diff --git a/crates/jmap/src/auth/authenticate.rs b/crates/jmap/src/auth/authenticate.rs index 4742992a..8c320a21 100644 --- a/crates/jmap/src/auth/authenticate.rs +++ b/crates/jmap/src/auth/authenticate.rs @@ -23,7 +23,7 @@ use std::{net::IpAddr, sync::Arc, time::Instant}; -use common::{listener::limiter::InFlight, AuthResult}; +use common::{config::server::ServerProtocol, listener::limiter::InFlight, AuthResult}; use directory::{Principal, QueryBy}; use hyper::header; use jmap_proto::error::request::RequestError; @@ -63,8 +63,9 @@ impl JMAP { }) }) { - if let AuthResult::Success(access_token) = - self.authenticate_plain(&account, &secret, remote_ip).await + if let AuthResult::Success(access_token) = self + .authenticate_plain(&account, &secret, remote_ip, ServerProtocol::Http) + .await { Some(access_token) } else { @@ -154,16 +155,19 @@ impl JMAP { username: &str, secret: &str, remote_ip: IpAddr, + protocol: ServerProtocol, ) -> AuthResult { match self .core .authenticate( &self.core.storage.directory, + &self.smtp.inner.ipc, &Credentials::Plain { username: username.to_string(), secret: secret.to_string(), }, remote_ip, + protocol, true, ) .await diff --git a/crates/jmap/src/email/copy.rs b/crates/jmap/src/email/copy.rs index 2fa28803..a938b968 100644 --- a/crates/jmap/src/email/copy.rs +++ b/crates/jmap/src/email/copy.rs @@ -314,8 +314,9 @@ impl JMAP { }; // Check quota - if account_quota > 0 - && metadata.size as i64 + self.get_used_quota(account_id).await? > account_quota + if !self + .has_available_quota(account_id, account_quota, metadata.size as i64) + .await? { return Ok(Err(SetError::over_quota())); } diff --git a/crates/jmap/src/email/import.rs b/crates/jmap/src/email/import.rs index 08857eba..dd405ebd 100644 --- a/crates/jmap/src/email/import.rs +++ b/crates/jmap/src/email/import.rs @@ -41,7 +41,7 @@ use utils::map::vec_map::VecMap; use crate::{auth::AccessToken, IngestError, JMAP}; -use super::ingest::IngestEmail; +use super::ingest::{IngestEmail, IngestSource}; impl JMAP { pub async fn email_import( @@ -140,7 +140,7 @@ impl JMAP { mailbox_ids, keywords: email.keywords, received_at: email.received_at.map(|r| r.into()), - skip_duplicates: true, + source: IngestSource::Jmap, encrypt: self.core.jmap.encrypt && self.core.jmap.encrypt_append, }) .await diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs index 0c3921a6..2fc41699 100644 --- a/crates/jmap/src/email/ingest.rs +++ b/crates/jmap/src/email/ingest.rs @@ -23,6 +23,7 @@ use std::{borrow::Cow, time::Duration}; +use common::webhooks::{WebhookIngestSource, WebhookPayload, WebhookType}; use jmap_proto::{ object::Object, types::{ @@ -76,10 +77,17 @@ pub struct IngestEmail<'x> { pub mailbox_ids: Vec, pub keywords: Vec, pub received_at: Option, - pub skip_duplicates: bool, + pub source: IngestSource, pub encrypt: bool, } +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum IngestSource { + Smtp, + Jmap, + Imap, +} + const MAX_RETRIES: u32 = 10; impl JMAP { @@ -90,13 +98,10 @@ impl JMAP { ) -> Result { // Check quota let mut raw_message_len = params.raw_message.len() as i64; - if params.account_quota > 0 - && raw_message_len - + self - .get_used_quota(params.account_id) - .await - .map_err(|_| IngestError::Temporary)? - > params.account_quota + if !self + .has_available_quota(params.account_id, params.account_quota, raw_message_len) + .await + .map_err(|_| IngestError::Temporary)? { return Err(IngestError::OverQuota); } @@ -162,7 +167,7 @@ impl JMAP { } // Check for duplicates - if params.skip_duplicates + if params.source == IngestSource::Smtp && !message_id.is_empty() && !self .core @@ -387,6 +392,31 @@ impl JMAP { size = raw_message_len, "Ingested e-mail."); + // Send webhook event + if self + .core + .has_webhook_subscribers(WebhookType::MessageAppended) + { + self.smtp + .inner + .ipc + .send_webhook( + WebhookType::MessageAppended, + WebhookPayload::MessageAppended { + account_id: params.account_id, + mailbox_ids: params.mailbox_ids, + source: match params.source { + IngestSource::Smtp => WebhookIngestSource::Smtp, + IngestSource::Jmap => WebhookIngestSource::Jmap, + IngestSource::Imap => WebhookIngestSource::Imap, + }, + encrypt: params.encrypt, + size: raw_message_len as usize, + }, + ) + .await; + } + Ok(IngestedEmail { id, change_id, diff --git a/crates/jmap/src/email/set.rs b/crates/jmap/src/email/set.rs index 89d8f7cd..19f01116 100644 --- a/crates/jmap/src/email/set.rs +++ b/crates/jmap/src/email/set.rs @@ -63,7 +63,7 @@ use crate::{auth::AccessToken, mailbox::UidMailbox, IngestError, JMAP}; use super::{ headers::{BuildHeader, ValueToHeader}, - ingest::IngestEmail, + ingest::{IngestEmail, IngestSource}, }; impl JMAP { @@ -737,7 +737,7 @@ impl JMAP { mailbox_ids: mailboxes, keywords, received_at, - skip_duplicates: false, + source: IngestSource::Jmap, encrypt: self.core.jmap.encrypt && self.core.jmap.encrypt_append, }) .await diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index 264de7fd..1840710d 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -29,7 +29,11 @@ use std::{ }; use auth::{rate_limit::ConcurrencyLimiters, AccessToken}; -use common::{manager::webadmin::WebAdminManager, Core, DeliveryEvent, SharedCore}; +use common::{ + manager::webadmin::WebAdminManager, + webhooks::{WebhookPayload, WebhookType}, + Core, DeliveryEvent, SharedCore, +}; use dashmap::DashMap; use directory::QueryBy; use email::cache::Threads; @@ -404,6 +408,43 @@ impl JMAP { }) } + pub async fn has_available_quota( + &self, + account_id: u32, + account_quota: i64, + item_size: i64, + ) -> Result { + if account_quota == 0 { + return Ok(true); + } + let used_quota = self.get_used_quota(account_id).await?; + if used_quota + item_size <= account_quota { + Ok(true) + } else { + // Send webhook + if self + .core + .has_webhook_subscribers(WebhookType::AccountOverQuota) + { + self.smtp + .inner + .ipc + .send_webhook( + WebhookType::AccountOverQuota, + WebhookPayload::AccountOverQuota { + account_id, + quota_limit: account_quota as usize, + quota_used: used_quota as usize, + object_size: item_size as usize, + }, + ) + .await; + } + + Ok(false) + } + } + pub async fn filter( &self, account_id: u32, diff --git a/crates/jmap/src/push/manager.rs b/crates/jmap/src/push/manager.rs index bfee8259..9ef5cdd7 100644 --- a/crates/jmap/src/push/manager.rs +++ b/crates/jmap/src/push/manager.rs @@ -22,11 +22,12 @@ */ use base64::{engine::general_purpose, Engine}; +use common::IPC_CHANNEL_BUFFER; use jmap_proto::types::id::Id; use store::ahash::{AHashMap, AHashSet}; use tokio::sync::mpsc; -use crate::{api::StateChangeResponse, services::IPC_CHANNEL_BUFFER, JmapInstance, LONG_SLUMBER}; +use crate::{api::StateChangeResponse, JmapInstance, LONG_SLUMBER}; use super::{ece::ece_encrypt, EncryptionKeys, Event, PushServer, PushUpdate}; @@ -48,6 +49,9 @@ pub fn spawn_push_manager(core: JmapInstance) -> mpsc::Sender { let mut retry_ids = AHashSet::default(); loop { + // Wait for the next event or timeout + let event_or_timeout = tokio::time::timeout(retry_timeout, push_rx.recv()).await; + // Load settings let core_ = core.core.load(); let push_attempt_interval = core_.jmap.push_attempt_interval; @@ -57,7 +61,7 @@ pub fn spawn_push_manager(core: JmapInstance) -> mpsc::Sender { let push_verify_timeout = core_.jmap.push_verify_timeout; let push_throttle = core_.jmap.push_throttle; - match tokio::time::timeout(retry_timeout, push_rx.recv()).await { + match event_or_timeout { Ok(Some(event)) => match event { Event::Update { updates } => { for update in updates { diff --git a/crates/jmap/src/services/gossip/spawn.rs b/crates/jmap/src/services/gossip/spawn.rs index e880a0f8..84ad927c 100644 --- a/crates/jmap/src/services/gossip/spawn.rs +++ b/crates/jmap/src/services/gossip/spawn.rs @@ -22,11 +22,11 @@ */ use crate::auth::SymmetricEncrypt; -use crate::services::IPC_CHANNEL_BUFFER; use crate::JmapInstance; use super::request::Request; use super::{Gossiper, Peer, UDP_MAX_PAYLOAD}; +use common::IPC_CHANNEL_BUFFER; use std::net::IpAddr; use std::time::{Duration, Instant}; use std::{net::SocketAddr, sync::Arc}; diff --git a/crates/jmap/src/services/housekeeper.rs b/crates/jmap/src/services/housekeeper.rs index cd6393e5..f7d24816 100644 --- a/crates/jmap/src/services/housekeeper.rs +++ b/crates/jmap/src/services/housekeeper.rs @@ -26,14 +26,13 @@ use std::{ time::{Duration, Instant}, }; +use common::IPC_CHANNEL_BUFFER; use store::{write::purge::PurgeStore, BlobStore, LookupStore, Store}; use tokio::sync::mpsc; use utils::map::ttl_dashmap::TtlMap; use crate::{Inner, JmapInstance, JMAP, LONG_SLUMBER}; -use super::IPC_CHANNEL_BUFFER; - pub enum Event { IndexStart, IndexDone, diff --git a/crates/jmap/src/services/ingest.rs b/crates/jmap/src/services/ingest.rs index 90aaeb36..58e14ac9 100644 --- a/crates/jmap/src/services/ingest.rs +++ b/crates/jmap/src/services/ingest.rs @@ -27,7 +27,11 @@ use jmap_proto::types::{state::StateChange, type_state::DataType}; use mail_parser::MessageParser; use store::ahash::AHashMap; -use crate::{email::ingest::IngestEmail, mailbox::INBOX_ID, IngestError, JMAP}; +use crate::{ + email::ingest::{IngestEmail, IngestSource}, + mailbox::INBOX_ID, + IngestError, JMAP, +}; impl JMAP { pub async fn deliver_message(&self, message: IngestMessage) -> Vec { @@ -123,7 +127,7 @@ impl JMAP { mailbox_ids: vec![INBOX_ID], keywords: vec![], received_at: None, - skip_duplicates: true, + source: IngestSource::Smtp, encrypt: self.core.jmap.encrypt, }) .await diff --git a/crates/jmap/src/services/mod.rs b/crates/jmap/src/services/mod.rs index 21051a29..f3a92a89 100644 --- a/crates/jmap/src/services/mod.rs +++ b/crates/jmap/src/services/mod.rs @@ -27,5 +27,3 @@ pub mod housekeeper; pub mod index; pub mod ingest; pub mod state; - -pub const IPC_CHANNEL_BUFFER: usize = 1024; diff --git a/crates/jmap/src/services/state.rs b/crates/jmap/src/services/state.rs index 850ce6d1..289aa757 100644 --- a/crates/jmap/src/services/state.rs +++ b/crates/jmap/src/services/state.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant, SystemTime}; +use common::IPC_CHANNEL_BUFFER; use jmap_proto::types::{id::Id, state::StateChange, type_state::DataType}; use store::ahash::AHashMap; use tokio::sync::mpsc; @@ -33,8 +34,6 @@ use crate::{ JmapInstance, JMAP, }; -use super::IPC_CHANNEL_BUFFER; - #[derive(Debug)] pub enum Event { Subscribe { diff --git a/crates/jmap/src/sieve/ingest.rs b/crates/jmap/src/sieve/ingest.rs index 955c914c..6fe0d172 100644 --- a/crates/jmap/src/sieve/ingest.rs +++ b/crates/jmap/src/sieve/ingest.rs @@ -35,7 +35,7 @@ use store::{ }; use crate::{ - email::ingest::{IngestEmail, IngestedEmail}, + email::ingest::{IngestEmail, IngestSource, IngestedEmail}, mailbox::{INBOX_ID, TRASH_ID}, sieve::SeenIdHash, IngestError, JMAP, @@ -446,7 +446,7 @@ impl JMAP { mailbox_ids: sieve_message.file_into, keywords: sieve_message.flags, received_at: None, - skip_duplicates: true, + source: IngestSource::Smtp, encrypt: self.core.jmap.encrypt, }) .await diff --git a/crates/jmap/src/sieve/set.rs b/crates/jmap/src/sieve/set.rs index 946f58c1..d8cb964c 100644 --- a/crates/jmap/src/sieve/set.rs +++ b/crates/jmap/src/sieve/set.rs @@ -144,9 +144,13 @@ impl JMAP { _ => unreachable!(), } } else { - ctx.response.not_created.append(id, SetError::new(SetErrorType::OverQuota).with_description( - "There are too many sieve scripts, please delete some before adding a new one.", - )); + ctx.response.not_created.append( + id, + SetError::new(SetErrorType::OverQuota).with_description(concat!( + "There are too many sieve scripts, ", + "please delete some before adding a new one." + )), + ); } } @@ -422,9 +426,10 @@ impl JMAP { // Vacation script cannot be modified if matches!(update.as_ref().and_then(|(_, obj)| obj.inner.properties.get(&Property::Name)), Some(Value::Text ( value )) if value.eq_ignore_ascii_case("vacation")) { - return Ok(Err(SetError::forbidden().with_description( - "The 'vacation' script cannot be modified, use VacationResponse/set instead.", - ))); + return Ok(Err(SetError::forbidden().with_description(concat!( + "The 'vacation' script cannot be modified, ", + "use VacationResponse/set instead." + )))); } // Parse properties @@ -521,12 +526,12 @@ impl JMAP { // Check access if let Some(mut bytes) = self.blob_download(&blob_id, ctx.access_token).await? { // Check quota - if ctx.account_quota > 0 - && bytes.len() as i64 + self.get_used_quota(ctx.account_id).await? - > ctx.account_quota - { - return Ok(Err(SetError::over_quota())); - } + if !self + .has_available_quota(ctx.account_id, ctx.account_quota, bytes.len() as i64) + .await? + { + return Ok(Err(SetError::over_quota())); + } // Compile script match self.core.sieve.untrusted_compiler.compile(&bytes) { diff --git a/crates/main/Cargo.toml b/crates/main/Cargo.toml index 4c6698a5..6ac038bf 100644 --- a/crates/main/Cargo.toml +++ b/crates/main/Cargo.toml @@ -19,7 +19,7 @@ path = "src/main.rs" store = { path = "../store" } jmap = { path = "../jmap" } jmap_proto = { path = "../jmap-proto" } -smtp = { path = "../smtp", features = ["local_delivery"] } +smtp = { path = "../smtp" } imap = { path = "../imap" } pop3 = { path = "../pop3" } managesieve = { path = "../managesieve" } diff --git a/crates/main/src/main.rs b/crates/main/src/main.rs index 31e93a61..e2f99121 100644 --- a/crates/main/src/main.rs +++ b/crates/main/src/main.rs @@ -23,13 +23,12 @@ use std::time::Duration; -use common::{config::server::ServerProtocol, manager::boot::BootManager}; -use imap::core::{ImapSessionManager, IMAP}; -use jmap::{ - api::JmapSessionManager, - services::{gossip::spawn::GossiperBuilder, IPC_CHANNEL_BUFFER}, - JMAP, +use common::{ + config::server::ServerProtocol, manager::boot::BootManager, + webhooks::manager::spawn_webhook_manager, Ipc, IPC_CHANNEL_BUFFER, }; +use imap::core::{ImapSessionManager, IMAP}; +use jmap::{api::JmapSessionManager, services::gossip::spawn::GossiperBuilder, JMAP}; use managesieve::core::ManageSieveSessionManager; use pop3::Pop3SessionManager; use smtp::core::{SmtpSessionManager, SMTP}; @@ -52,9 +51,18 @@ async fn main() -> std::io::Result<()> { let mut config = init.config; let core = init.core; - // Init servers + // Spawn webhook manager + let webhook_tx = spawn_webhook_manager(core.clone()); + + // Setup IPC channels let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER); - let smtp = SMTP::init(&mut config, core.clone(), delivery_tx).await; + let ipc = Ipc { + delivery_tx, + webhook_tx, + }; + + // Init servers + let smtp = SMTP::init(&mut config, core.clone(), ipc).await; let jmap = JMAP::init(&mut config, delivery_rx, core.clone(), smtp.inner.clone()).await; let imap = IMAP::init(&mut config, jmap.clone()).await; let gossiper = GossiperBuilder::try_parse(&mut config); @@ -101,7 +109,7 @@ async fn main() -> std::io::Result<()> { // Spawn gossip if let Some(gossiper) = gossiper { - gossiper.spawn(jmap, shutdown_rx).await; + gossiper.spawn(jmap, shutdown_rx.clone()).await; } // Wait for shutdown signal diff --git a/crates/managesieve/src/op/authenticate.rs b/crates/managesieve/src/op/authenticate.rs index 5b045c36..672b8fa9 100644 --- a/crates/managesieve/src/op/authenticate.rs +++ b/crates/managesieve/src/op/authenticate.rs @@ -22,6 +22,7 @@ */ use common::{ + config::server::ServerProtocol, listener::{limiter::ConcurrencyLimiter, SessionStream}, AuthResult, }; @@ -99,7 +100,12 @@ impl Session { Credentials::Plain { username, secret } | Credentials::XOauth2 { username, secret } => { match self .jmap - .authenticate_plain(&username, &secret, self.remote_addr) + .authenticate_plain( + &username, + &secret, + self.remote_addr, + ServerProtocol::ManageSieve, + ) .await { AuthResult::Success(token) => Some(token), diff --git a/crates/managesieve/src/op/putscript.rs b/crates/managesieve/src/op/putscript.rs index e41bc809..01974a05 100644 --- a/crates/managesieve/src/op/putscript.rs +++ b/crates/managesieve/src/op/putscript.rs @@ -55,12 +55,18 @@ impl Session { // Check quota let access_token = self.state.access_token(); let account_id = access_token.primary_id(); - if access_token.quota > 0 - && script_bytes.len() as i64 + self.jmap.get_used_quota(account_id).await? - > access_token.quota as i64 + if !self + .jmap + .has_available_quota( + account_id, + access_token.quota as i64, + script_bytes.len() as i64, + ) + .await? { return Err(StatusResponse::no("Quota exceeded.").with_code(ResponseCode::Quota)); } + if self .jmap .get_document_ids(account_id, Collection::SieveScript) diff --git a/crates/pop3/src/op/authenticate.rs b/crates/pop3/src/op/authenticate.rs index 252d5f2e..857a2d92 100644 --- a/crates/pop3/src/op/authenticate.rs +++ b/crates/pop3/src/op/authenticate.rs @@ -22,6 +22,7 @@ */ use common::{ + config::server::ServerProtocol, listener::{limiter::ConcurrencyLimiter, SessionStream}, AuthResult, }; @@ -103,7 +104,7 @@ impl Session { Credentials::Plain { username, secret } | Credentials::XOauth2 { username, secret } => { match self .jmap - .authenticate_plain(&username, &secret, self.remote_addr) + .authenticate_plain(&username, &secret, self.remote_addr, ServerProtocol::Pop3) .await { AuthResult::Success(token) => Some(token), diff --git a/crates/smtp/Cargo.toml b/crates/smtp/Cargo.toml index da35f016..1a924c6c 100644 --- a/crates/smtp/Cargo.toml +++ b/crates/smtp/Cargo.toml @@ -61,7 +61,6 @@ bincode = "1.3.1" [features] test_mode = [] -local_delivery = [] #[[bench]] #name = "hash" diff --git a/crates/smtp/src/core/mod.rs b/crates/smtp/src/core/mod.rs index 99afd5e0..960291f3 100644 --- a/crates/smtp/src/core/mod.rs +++ b/crates/smtp/src/core/mod.rs @@ -29,12 +29,12 @@ use std::{ }; use common::{ - config::smtp::auth::VerifyStrategy, + config::{scripts::ScriptCache, smtp::auth::VerifyStrategy}, listener::{ limiter::{ConcurrencyLimiter, InFlight}, ServerInstance, }, - Core, DeliveryEvent, SharedCore, + Core, Ipc, SharedCore, }; use dashmap::DashMap; use directory::Directory; @@ -100,8 +100,8 @@ pub struct Inner { pub report_tx: mpsc::Sender, pub snowflake_id: SnowflakeIdGenerator, pub connectors: TlsConnectors, - #[cfg(feature = "local_delivery")] - pub delivery_tx: mpsc::Sender, + pub ipc: Ipc, + pub script_cache: ScriptCache, } pub struct TlsConnectors { @@ -279,7 +279,6 @@ impl From for SMTP { } } -#[cfg(feature = "local_delivery")] lazy_static::lazy_static! { static ref SIEVE: Arc = Arc::new(ServerInstance { id: "sieve".to_string(), @@ -291,7 +290,6 @@ static ref SIEVE: Arc = Arc::new(ServerInstance { }); } -#[cfg(feature = "local_delivery")] impl Session { pub fn local(core: SMTP, instance: std::sync::Arc, data: SessionData) -> Self { Session { @@ -368,7 +366,6 @@ impl Session { } } -#[cfg(feature = "local_delivery")] impl SessionData { pub fn local( mail_from: Option, @@ -404,14 +401,12 @@ impl SessionData { } } -#[cfg(feature = "local_delivery")] impl Default for SessionData { fn default() -> Self { Self::local(None, vec![], vec![]) } } -#[cfg(feature = "local_delivery")] impl SessionAddress { pub fn new(address: String) -> Self { let address_lcase = address.to_lowercase(); @@ -438,7 +433,11 @@ impl Default for Inner { pki_verify: mail_send::smtp::tls::build_tls_connector(false), dummy_verify: mail_send::smtp::tls::build_tls_connector(true), }, - delivery_tx: mpsc::channel(1).0, + ipc: Ipc { + delivery_tx: mpsc::channel(1).0, + webhook_tx: mpsc::channel(1).0, + }, + script_cache: Default::default(), } } } diff --git a/crates/smtp/src/inbound/auth.rs b/crates/smtp/src/inbound/auth.rs index 62360d2c..2a9dcf0e 100644 --- a/crates/smtp/src/inbound/auth.rs +++ b/crates/smtp/src/inbound/auth.rs @@ -183,7 +183,14 @@ impl Session { match self .core .core - .authenticate(directory, &credentials, self.data.remote_ip, false) + .authenticate( + directory, + &self.core.inner.ipc, + &credentials, + self.data.remote_ip, + self.instance.protocol, + false, + ) .await { Ok(AuthResult::Success(principal)) => { diff --git a/crates/smtp/src/inbound/data.rs b/crates/smtp/src/inbound/data.rs index 7042bc59..1b3bb7ce 100644 --- a/crates/smtp/src/inbound/data.rs +++ b/crates/smtp/src/inbound/data.rs @@ -32,12 +32,14 @@ use common::{ config::smtp::{auth::VerifyStrategy, session::Stage}, listener::SessionStream, scripts::ScriptModification, + webhooks::{WebhookMessageFailure, WebhookPayload, WebhookType}, }; use mail_auth::{ common::{headers::HeaderWriter, verify::VerifySignature}, dmarc, AuthenticatedMessage, AuthenticationResults, DkimResult, DmarcResult, ReceivedSpf, }; use mail_builder::headers::{date::Date, message_id::generate_message_id_header}; +use mail_parser::DateTime; use sieve::runtime::Variable; use smtp_proto::{ MAIL_BY_RETURN, RCPT_NOTIFY_DELAY, RCPT_NOTIFY_FAILURE, RCPT_NOTIFY_NEVER, RCPT_NOTIFY_SUCCESS, @@ -70,6 +72,9 @@ impl Session { event = "parse-failed", size = raw_message.len()); + self.send_failure_webhook(WebhookMessageFailure::ParseFailed) + .await; + return (&b"550 5.7.7 Failed to parse message.\r\n"[..]).into(); }; @@ -91,6 +96,10 @@ impl Session { return_path = self.data.mail_from.as_ref().unwrap().address, from = auth_message.from(), received_headers = auth_message.received_headers_count()); + + self.send_failure_webhook(WebhookMessageFailure::LoopDetected) + .await; + return (&b"450 4.4.6 Too many Received headers. Possible loop detected.\r\n"[..]) .into(); } @@ -141,6 +150,9 @@ impl Session { result = ?dkim_output.iter().map(|d| d.result().to_string()).collect::>(), "No passing DKIM signatures found."); + self.send_failure_webhook(WebhookMessageFailure::DkimPolicy) + .await; + // 'Strict' mode violates the advice of Section 6.1 of RFC6376 return if dkim_output .iter() @@ -197,6 +209,9 @@ impl Session { result = %arc_output.result(), "ARC validation failed."); + self.send_failure_webhook(WebhookMessageFailure::ArcPolicy) + .await; + return if matches!(arc_output.result(), DkimResult::TempError(_)) { (&b"451 4.7.29 ARC validation failed.\r\n"[..]).into() } else { @@ -316,6 +331,9 @@ impl Session { } if rejected { + self.send_failure_webhook(WebhookMessageFailure::DmarcPolicy) + .await; + return if is_temp_fail { (&b"451 4.7.1 Email temporarily rejected per DMARC policy.\r\n"[..]).into() } else { @@ -421,7 +439,12 @@ impl Session { modifications = modifications_; } } - Err(response) => return response.into_bytes(), + Err(response) => { + self.send_failure_webhook(WebhookMessageFailure::MilterReject) + .await; + + return response.into_bytes(); + } }; // Run JMilter filters @@ -438,7 +461,12 @@ impl Session { modifications.extend(modifications_); } } - Err(response) => return response.into_bytes(), + Err(response) => { + self.send_failure_webhook(WebhookMessageFailure::MilterReject) + .await; + + return response.into_bytes(); + } }; // Apply modifications @@ -624,9 +652,15 @@ impl Session { event = "reject", reason = message); + self.send_failure_webhook(WebhookMessageFailure::SieveReject) + .await; + return message.into_bytes().into(); } ScriptResult::Discard => { + self.send_failure_webhook(WebhookMessageFailure::SieveDiscard) + .await; + return (b"250 2.0.0 Message queued for delivery.\r\n"[..]).into(); } }; @@ -725,15 +759,52 @@ impl Session { // Verify queue quota if self.core.has_quota(&mut message).await { + // Prepare webhook event let queue_id = message.id; + let webhook_event = self + .core + .core + .has_webhook_subscribers(WebhookType::MessageAccepted) + .then(|| WebhookPayload::MessageAccepted { + id: queue_id, + remote_ip: self.data.remote_ip.into(), + local_port: self.data.local_port.into(), + authenticated_as: (!self.data.authenticated_as.is_empty()) + .then(|| self.data.authenticated_as.clone()), + return_path: message.return_path_lcase.clone(), + recipients: message + .recipients + .iter() + .map(|r| r.address_lcase.clone()) + .collect(), + next_retry: DateTime::from_timestamp(message.next_delivery_event() as i64) + .to_rfc3339(), + next_dsn: DateTime::from_timestamp(message.next_dsn() as i64).to_rfc3339(), + expires: DateTime::from_timestamp(message.expires() as i64).to_rfc3339(), + size: message.size, + }); + + // Queue message if message .queue(Some(&headers), raw_message, &self.core, &self.span) .await { + // Send webhook event + if let Some(event) = webhook_event { + self.core + .inner + .ipc + .send_webhook(WebhookType::MessageAccepted, event) + .await; + } + self.state = State::Accepted(queue_id); self.data.messages_sent += 1; (b"250 2.0.0 Message queued for delivery.\r\n"[..]).into() } else { + self.send_failure_webhook(WebhookMessageFailure::ServerFailure) + .await; + (b"451 4.3.5 Unable to accept message at this time.\r\n"[..]).into() } } else { @@ -744,6 +815,10 @@ impl Session { from = message.return_path, "Queue quota exceeded, rejecting message." ); + + self.send_failure_webhook(WebhookMessageFailure::QuotaExceeded) + .await; + (b"452 4.3.1 Mail system full, try again later.\r\n"[..]).into() } } @@ -955,4 +1030,38 @@ impl Session { headers.extend_from_slice(Date::now().to_rfc822().as_bytes()); headers.extend_from_slice(b"\r\n"); } + + async fn send_failure_webhook(&self, reason: WebhookMessageFailure) { + if self + .core + .core + .has_webhook_subscribers(WebhookType::MessageRejected) + { + self.core + .inner + .ipc + .send_webhook( + WebhookType::MessageRejected, + WebhookPayload::MessageRejected { + reason, + remote_ip: self.data.remote_ip, + local_port: self.data.local_port, + authenticated_as: (!self.data.authenticated_as.is_empty()) + .then(|| self.data.authenticated_as.clone()), + return_path: self + .data + .mail_from + .as_ref() + .map(|m| m.address_lcase.clone()), + recipients: self + .data + .rcpt_to + .iter() + .map(|r| r.address_lcase.clone()) + .collect(), + }, + ) + .await; + } + } } diff --git a/crates/smtp/src/inbound/ehlo.rs b/crates/smtp/src/inbound/ehlo.rs index 4bfe0eab..6360a194 100644 --- a/crates/smtp/src/inbound/ehlo.rs +++ b/crates/smtp/src/inbound/ehlo.rs @@ -28,7 +28,7 @@ use common::{ config::smtp::session::{Mechanism, Stage}, listener::SessionStream, }; -use mail_auth::spf::verify::HasLabels; +use mail_auth::spf::verify::HasValidLabels; use smtp_proto::*; impl Session { @@ -37,7 +37,7 @@ impl Session { if domain != self.data.helo_domain { // Reject non-FQDN EHLO domains - simply checks that the hostname has at least one dot - if self.params.ehlo_reject_non_fqdn && !domain.as_str().has_labels() { + if self.params.ehlo_reject_non_fqdn && !domain.as_str().has_valid_labels() { tracing::info!(parent: &self.span, context = "ehlo", event = "reject", diff --git a/crates/smtp/src/inbound/jmilter/client.rs b/crates/smtp/src/inbound/jmilter/client.rs index 3131aa93..c56cdf26 100644 --- a/crates/smtp/src/inbound/jmilter/client.rs +++ b/crates/smtp/src/inbound/jmilter/client.rs @@ -36,10 +36,14 @@ pub(super) async fn send_jmilter_request( .map_err(|err| format!("Failed to create HTTP client: {}", err))? .post(&jmilter.url) .headers(jmilter.headers.clone()) - .body(serde_json::to_string(&request).unwrap()) + .body( + serde_json::to_string(&request) + .map_err(|err| format!("Failed to serialize jMilter request: {}", err))?, + ) .send() .await .map_err(|err| format!("jMilter request failed: {err}"))?; + if response.status().is_success() { serde_json::from_slice( response diff --git a/crates/smtp/src/inbound/jmilter/message.rs b/crates/smtp/src/inbound/jmilter/message.rs index ee4cda27..23cba77b 100644 --- a/crates/smtp/src/inbound/jmilter/message.rs +++ b/crates/smtp/src/inbound/jmilter/message.rs @@ -25,6 +25,7 @@ use ahash::AHashMap; use common::{ config::smtp::session::{JMilter, Stage}, listener::SessionStream, + DAEMON_NAME, }; use mail_auth::AuthenticatedMessage; @@ -37,7 +38,6 @@ use crate::{ milter::Modification, FilterResponse, }, - DAEMON_NAME, }; use super::{client::send_jmilter_request, Action, Response}; diff --git a/crates/smtp/src/inbound/milter/message.rs b/crates/smtp/src/inbound/milter/message.rs index 40481742..778ff4e1 100644 --- a/crates/smtp/src/inbound/milter/message.rs +++ b/crates/smtp/src/inbound/milter/message.rs @@ -26,6 +26,7 @@ use std::borrow::Cow; use common::{ config::smtp::session::{Milter, Stage}, listener::SessionStream, + DAEMON_NAME, }; use mail_auth::AuthenticatedMessage; use smtp_proto::{request::parser::Rfc5321Parser, IntoString}; @@ -35,7 +36,6 @@ use crate::{ core::{Session, SessionAddress, SessionData}, inbound::{milter::MilterClient, FilterResponse}, queue::DomainPart, - DAEMON_NAME, }; use super::{Action, Error, Macros, Modification}; diff --git a/crates/smtp/src/inbound/session.rs b/crates/smtp/src/inbound/session.rs index 6e8f10b5..945f18d7 100644 --- a/crates/smtp/src/inbound/session.rs +++ b/crates/smtp/src/inbound/session.rs @@ -164,7 +164,7 @@ impl Session { } Request::Help { .. } => { self.write( - b"250 2.0.0 Help can be found at https://stalw.art/smtp/\r\n", + b"250 2.0.0 Help can be found at https://stalw.art/docs/\r\n", ) .await?; } diff --git a/crates/smtp/src/inbound/spawn.rs b/crates/smtp/src/inbound/spawn.rs index 6956cb98..6bcf2995 100644 --- a/crates/smtp/src/inbound/spawn.rs +++ b/crates/smtp/src/inbound/spawn.rs @@ -82,10 +82,10 @@ impl SessionManager for SmtpSessionManager { .report_tx .send(reporting::Event::Stop) .await; - #[cfg(feature = "local_delivery")] let _ = self .inner .inner + .ipc .delivery_tx .send(common::DeliveryEvent::Stop) .await; diff --git a/crates/smtp/src/lib.rs b/crates/smtp/src/lib.rs index 33476016..0eafdd11 100644 --- a/crates/smtp/src/lib.rs +++ b/crates/smtp/src/lib.rs @@ -24,7 +24,7 @@ use crate::core::{throttle::ThrottleKeyHasherBuilder, TlsConnectors}; use core::{Inner, SmtpInstance, SMTP}; -use common::SharedCore; +use common::{config::scripts::ScriptCache, Ipc, SharedCore}; use dashmap::DashMap; use mail_send::smtp::tls::build_tls_connector; use queue::manager::SpawnQueue; @@ -39,15 +39,8 @@ pub mod queue; pub mod reporting; pub mod scripts; -pub static USER_AGENT: &str = concat!("StalwartSMTP/", env!("CARGO_PKG_VERSION"),); -pub static DAEMON_NAME: &str = concat!("Stalwart SMTP v", env!("CARGO_PKG_VERSION"),); - impl SMTP { - pub async fn init( - config: &mut Config, - core: SharedCore, - #[cfg(feature = "local_delivery")] delivery_tx: mpsc::Sender, - ) -> SmtpInstance { + pub async fn init(config: &mut Config, core: SharedCore, ipc: Ipc) -> SmtpInstance { // Build inner let capacity = config.property("cache.capacity").unwrap_or(2); let shard = config @@ -77,8 +70,8 @@ impl SMTP { pki_verify: build_tls_connector(false), dummy_verify: build_tls_connector(true), }, - #[cfg(feature = "local_delivery")] - delivery_tx, + ipc, + script_cache: ScriptCache::parse(config), }; let inner = SmtpInstance::new(core, inner); diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs index 2cad4304..2418fb60 100644 --- a/crates/smtp/src/outbound/delivery.rs +++ b/crates/smtp/src/outbound/delivery.rs @@ -202,13 +202,12 @@ impl DeliveryAttempt { .await .and_then(|name| core.core.get_relay_host(&name)) { - #[cfg(feature = "local_delivery")] Some(next_hop) if next_hop.protocol == ServerProtocol::Http => { // Deliver message locally let delivery_result = message .deliver_local( recipients.iter_mut().filter(|r| r.domain_idx == domain_idx), - &core.inner.delivery_tx, + &core.inner.ipc.delivery_tx, &span, ) .await; diff --git a/crates/smtp/src/outbound/mod.rs b/crates/smtp/src/outbound/mod.rs index 9c8e8831..e0e0db0f 100644 --- a/crates/smtp/src/outbound/mod.rs +++ b/crates/smtp/src/outbound/mod.rs @@ -36,7 +36,7 @@ use crate::queue::{ pub mod dane; pub mod delivery; -#[cfg(feature = "local_delivery")] + pub mod local; pub mod lookup; pub mod mta_sts; @@ -158,7 +158,6 @@ impl Status<(), Error> { })) } - #[cfg(feature = "local_delivery")] pub fn local_error() -> Self { Status::TemporaryFailure(Error::ConnectionError(ErrorDetails { entity: "localhost".to_string(), diff --git a/crates/smtp/src/queue/dsn.rs b/crates/smtp/src/queue/dsn.rs index c65fbd17..83f65e28 100644 --- a/crates/smtp/src/queue/dsn.rs +++ b/crates/smtp/src/queue/dsn.rs @@ -21,6 +21,7 @@ * for more details. */ +use common::webhooks::{WebhookDSN, WebhookDSNType, WebhookPayload, WebhookType}; use mail_builder::headers::content_type::ContentType; use mail_builder::headers::HeaderType; use mail_builder::mime::{make_boundary, BodyPart, MimePart}; @@ -42,7 +43,11 @@ use super::{ impl SMTP { pub async fn send_dsn(&self, message: &mut Message, span: &tracing::Span) { + // Send webhook event + self.send_dsn_webhook(message).await; + if !message.return_path.is_empty() { + // Build DSN if let Some(dsn) = message.build_dsn(self, span).await { let mut dsn_message = self.new_message("", "", ""); dsn_message @@ -58,14 +63,142 @@ impl SMTP { let signature = self .sign_message(message, &self.core.smtp.queue.dsn.sign, &dsn, span) .await; + + // Queue DSN dsn_message .queue(signature.as_deref(), &dsn, self, span) .await; } } else { + // Handle double bounce message.handle_double_bounce(span); } } + + async fn send_dsn_webhook(&self, message: &Message) { + let typ = if !message.return_path.is_empty() { + WebhookType::DSN + } else { + WebhookType::DoubleBounce + }; + if !self.core.has_webhook_subscribers(typ) { + return; + } + + let now = now(); + let mut webhook_data = Vec::new(); + + for rcpt in &message.recipients { + if rcpt.has_flag(RCPT_DSN_SENT) { + continue; + } + + let domain = &message.domains[rcpt.domain_idx]; + match &rcpt.status { + Status::Completed(response) => { + webhook_data.push(WebhookDSN { + address: rcpt.address_lcase.clone(), + typ: WebhookDSNType::Success, + remote_host: response.hostname.clone().into(), + message: response.response.to_string(), + next_retry: None, + expires: None, + retry_count: None, + }); + } + Status::TemporaryFailure(response) if domain.notify.due <= now => { + webhook_data.push(WebhookDSN { + address: rcpt.address_lcase.clone(), + typ: WebhookDSNType::TemporaryFailure, + remote_host: response.hostname.entity.clone().into(), + message: response.response.to_string(), + next_retry: DateTime::from_timestamp(domain.retry.due as i64) + .to_rfc3339() + .into(), + expires: DateTime::from_timestamp(domain.expires as i64) + .to_rfc3339() + .into(), + retry_count: domain.retry.inner.into(), + }); + } + Status::PermanentFailure(response) => { + webhook_data.push(WebhookDSN { + address: rcpt.address_lcase.clone(), + typ: WebhookDSNType::PermanentFailure, + remote_host: response.hostname.entity.clone().into(), + message: response.response.to_string(), + next_retry: None, + expires: None, + retry_count: domain.retry.inner.into(), + }); + } + Status::Scheduled => { + // There is no status for this address, use the domain's status. + match &domain.status { + Status::PermanentFailure(err) => { + webhook_data.push(WebhookDSN { + address: rcpt.address_lcase.clone(), + typ: WebhookDSNType::PermanentFailure, + remote_host: None, + message: err.to_string(), + next_retry: None, + expires: None, + retry_count: domain.retry.inner.into(), + }); + } + Status::TemporaryFailure(err) if domain.notify.due <= now => { + webhook_data.push(WebhookDSN { + address: rcpt.address_lcase.clone(), + typ: WebhookDSNType::TemporaryFailure, + remote_host: None, + message: err.to_string(), + next_retry: DateTime::from_timestamp(domain.retry.due as i64) + .to_rfc3339() + .into(), + expires: DateTime::from_timestamp(domain.expires as i64) + .to_rfc3339() + .into(), + retry_count: domain.retry.inner.into(), + }); + } + Status::Scheduled if domain.notify.due <= now => { + webhook_data.push(WebhookDSN { + address: rcpt.address_lcase.clone(), + typ: WebhookDSNType::TemporaryFailure, + remote_host: None, + message: "Concurrency limited".to_string(), + next_retry: DateTime::from_timestamp(domain.retry.due as i64) + .to_rfc3339() + .into(), + expires: DateTime::from_timestamp(domain.expires as i64) + .to_rfc3339() + .into(), + retry_count: domain.retry.inner.into(), + }); + } + _ => continue, + } + } + _ => continue, + } + } + + // Send webhook event + if !webhook_data.is_empty() { + self.inner + .ipc + .send_webhook( + typ, + WebhookPayload::DSN { + id: message.id, + sender: message.return_path_lcase.clone(), + status: webhook_data, + created: DateTime::from_timestamp(message.created as i64).to_rfc3339(), + }, + ) + .await; + } + } } impl Message { @@ -273,7 +406,7 @@ impl Message { // Prepare DSN let mut dsn_header = String::with_capacity(dsn.len() + 128); self.write_dsn_headers(&mut dsn_header, &reporting_mta); - let dsn = dsn_header + &dsn; + let dsn = dsn_header + dsn.as_str(); // Fetch up to 1024 bytes of message headers let headers = match core diff --git a/crates/smtp/src/queue/manager.rs b/crates/smtp/src/queue/manager.rs index 6760da3a..ab05bca2 100644 --- a/crates/smtp/src/queue/manager.rs +++ b/crates/smtp/src/queue/manager.rs @@ -165,6 +165,40 @@ impl Message { next_delivery } + pub fn next_dsn(&self) -> u64 { + let mut next_dsn = now(); + + for (pos, domain) in self + .domains + .iter() + .filter(|d| matches!(d.status, Status::Scheduled | Status::TemporaryFailure(_))) + .enumerate() + { + if pos == 0 || domain.notify.due < next_dsn { + next_dsn = domain.notify.due; + } + } + + next_dsn + } + + pub fn expires(&self) -> u64 { + let mut expires = now(); + + for (pos, domain) in self + .domains + .iter() + .filter(|d| matches!(d.status, Status::Scheduled | Status::TemporaryFailure(_))) + .enumerate() + { + if pos == 0 || domain.expires < expires { + expires = domain.expires; + } + } + + expires + } + pub fn next_event_after(&self, instant: u64) -> Option { let mut next_event = None; diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs index c96e2faa..8c32fc77 100644 --- a/crates/smtp/src/reporting/analysis.rs +++ b/crates/smtp/src/reporting/analysis.rs @@ -30,6 +30,7 @@ use std::{ }; use ahash::AHashMap; +use common::webhooks::{WebhookPayload, WebhookTlsPolicy, WebhookType}; use mail_auth::{ flate2::read::GzDecoder, report::{tlsrpt::TlsReport, ActionDisposition, DmarcResult, Feedback, Report}, @@ -233,6 +234,21 @@ impl SMTP { let report = match report.format { Format::Dmarc(_) => match Report::parse_xml(&data) { Ok(report) => { + // Send webhook + if core + .core + .has_webhook_subscribers(WebhookType::IncomingDmarcReport) + { + core.inner + .ipc + .send_webhook( + WebhookType::IncomingDmarcReport, + report.webhook_payload(), + ) + .await; + } + + // Log report.log(); Format::Dmarc(report) } @@ -248,6 +264,22 @@ impl SMTP { }, Format::Tls(_) => match TlsReport::parse_json(&data) { Ok(report) => { + // Send webhook + if core + .core + .has_webhook_subscribers(WebhookType::IncomingTlsReport) + { + core.inner + .ipc + .send_webhook( + WebhookType::IncomingTlsReport, + report.webhook_payload(), + ) + .await; + } + + // Log + report.log(); Format::Tls(report) } @@ -263,6 +295,21 @@ impl SMTP { }, Format::Arf(_) => match Feedback::parse_arf(&data) { Some(report) => { + // Send webhook + if core + .core + .has_webhook_subscribers(WebhookType::IncomingArfReport) + { + core.inner + .ipc + .send_webhook( + WebhookType::IncomingArfReport, + report.webhook_payload(), + ) + .await; + } + + // Log report.log(); Format::Arf(report.into_owned()) } @@ -339,6 +386,7 @@ impl SMTP { trait LogReport { fn log(&self); + fn webhook_payload(&self) -> WebhookPayload; } impl LogReport for Report { @@ -440,6 +488,81 @@ impl LogReport for Report { ); } } + + fn webhook_payload(&self) -> WebhookPayload { + let mut dmarc_pass = 0; + let mut dmarc_quarantine = 0; + let mut dmarc_reject = 0; + let mut dmarc_none = 0; + let mut dkim_pass = 0; + let mut dkim_fail = 0; + let mut dkim_none = 0; + let mut spf_pass = 0; + let mut spf_fail = 0; + let mut spf_none = 0; + + for record in self.records() { + let count = std::cmp::min(record.count(), 1); + + match record.action_disposition() { + ActionDisposition::Pass => { + dmarc_pass += count; + } + ActionDisposition::Quarantine => { + dmarc_quarantine += count; + } + ActionDisposition::Reject => { + dmarc_reject += count; + } + ActionDisposition::None | ActionDisposition::Unspecified => { + dmarc_none += count; + } + } + match record.dmarc_dkim_result() { + DmarcResult::Pass => { + dkim_pass += count; + } + DmarcResult::Fail => { + dkim_fail += count; + } + DmarcResult::Unspecified => { + dkim_none += count; + } + } + match record.dmarc_spf_result() { + DmarcResult::Pass => { + spf_pass += count; + } + DmarcResult::Fail => { + spf_fail += count; + } + DmarcResult::Unspecified => { + spf_none += count; + } + } + } + + let range_from = DateTime::from_timestamp(self.date_range_begin() as i64).to_rfc3339(); + let range_to = DateTime::from_timestamp(self.date_range_end() as i64).to_rfc3339(); + + WebhookPayload::IncomingDmarcReport { + range_from, + range_to, + domain: self.domain().to_string(), + report_email: self.email().to_string(), + report_id: self.report_id().to_string(), + dmarc_pass, + dmarc_quarantine, + dmarc_reject, + dmarc_none, + dkim_pass, + dkim_fail, + dkim_none, + spf_pass, + spf_fail, + spf_none, + } + } } impl LogReport for TlsReport { @@ -489,6 +612,39 @@ impl LogReport for TlsReport { } } } + + fn webhook_payload(&self) -> WebhookPayload { + let mut policies = Vec::with_capacity(self.policies.len()); + + for policy in self.policies.iter().take(5) { + let mut details = AHashMap::with_capacity(policy.failure_details.len()); + for failure in &policy.failure_details { + let num_failures = std::cmp::min(1, failure.failed_session_count); + match details.entry(failure.result_type) { + Entry::Occupied(mut e) => { + *e.get_mut() += num_failures; + } + Entry::Vacant(e) => { + e.insert(num_failures); + } + } + } + + policies.push(WebhookTlsPolicy { + range_from: self.date_range.start_datetime.to_rfc3339(), + range_to: self.date_range.end_datetime.to_rfc3339(), + domain: policy.policy.policy_domain.clone(), + report_contact: self.contact_info.clone(), + report_id: self.report_id.clone(), + policy_type: policy.policy.policy_type, + total_successes: policy.summary.total_success, + total_failures: policy.summary.total_failure, + details, + }); + } + + WebhookPayload::IncomingTlsReport { policies } + } } impl LogReport for Feedback<'_> { @@ -517,4 +673,34 @@ impl LogReport for Feedback<'_> { identity_alignment = ?self.identity_alignment(), ); } + + fn webhook_payload(&self) -> WebhookPayload { + WebhookPayload::IncomingArfReport { + feedback_type: self.feedback_type(), + arrival_date: self + .arrival_date() + .map(|a| DateTime::from_timestamp(a).to_rfc3339()), + authentication_results: self + .authentication_results() + .iter() + .map(|t| t.to_string()) + .collect(), + incidents: self.incidents(), + reported_domain: self + .reported_domain() + .iter() + .map(|t| t.to_string()) + .collect(), + reported_uri: self.reported_uri().iter().map(|t| t.to_string()).collect(), + reporting_mta: self.reporting_mta().map(|t| t.to_string()), + source_ip: self.source_ip(), + user_agent: self.user_agent().map(|t| t.to_string()), + auth_failure: self.auth_failure(), + delivery_result: self.delivery_result(), + dkim_domain: self.dkim_domain().map(|t| t.to_string()), + dkim_identity: self.dkim_identity().map(|t| t.to_string()), + dkim_selector: self.dkim_selector().map(|t| t.to_string()), + identity_alignment: self.identity_alignment(), + } + } } diff --git a/crates/smtp/src/reporting/mod.rs b/crates/smtp/src/reporting/mod.rs index fb713332..0dbed7ef 100644 --- a/crates/smtp/src/reporting/mod.rs +++ b/crates/smtp/src/reporting/mod.rs @@ -29,6 +29,8 @@ use common::{ resolver::{Policy, Tlsa}, }, expr::if_block::IfBlock, + webhooks::{WebhookPayload, WebhookType}, + USER_AGENT, }; use mail_auth::{ common::headers::HeaderWriter, @@ -47,7 +49,6 @@ use crate::{ core::{Session, SMTP}, inbound::DkimSign, queue::{DomainPart, Message}, - USER_AGENT, }; pub mod analysis; @@ -165,6 +166,36 @@ impl SMTP { } } + // Send webhook + if self + .core + .has_webhook_subscribers(WebhookType::OutgoingReport) + { + self.inner + .ipc + .send_webhook( + WebhookType::OutgoingReport, + WebhookPayload::MessageAccepted { + id: message.id, + remote_ip: None, + local_port: None, + authenticated_as: None, + return_path: message.return_path_lcase.clone(), + recipients: message + .recipients + .iter() + .map(|r| r.address_lcase.clone()) + .collect(), + next_retry: DateTime::from_timestamp(message.next_delivery_event() as i64) + .to_rfc3339(), + next_dsn: DateTime::from_timestamp(message.next_dsn() as i64).to_rfc3339(), + expires: DateTime::from_timestamp(message.expires() as i64).to_rfc3339(), + size: message.size, + }, + ) + .await; + } + // Queue message message .queue(signature.as_deref(), &report, self, span) diff --git a/crates/smtp/src/reporting/tls.rs b/crates/smtp/src/reporting/tls.rs index e2fee08b..87fcf8e1 100644 --- a/crates/smtp/src/reporting/tls.rs +++ b/crates/smtp/src/reporting/tls.rs @@ -24,9 +24,12 @@ use std::{collections::hash_map::Entry, sync::Arc, time::Duration}; use ahash::AHashMap; -use common::config::smtp::{ - report::AggregateFrequency, - resolver::{Mode, MxPattern}, +use common::{ + config::smtp::{ + report::AggregateFrequency, + resolver::{Mode, MxPattern}, + }, + USER_AGENT, }; use mail_auth::{ flate2::{write::GzEncoder, Compression}, @@ -44,7 +47,7 @@ use store::{ Deserialize, IterateParams, Serialize, ValueKey, }; -use crate::{core::SMTP, queue::RecipientDomain, USER_AGENT}; +use crate::{core::SMTP, queue::RecipientDomain}; use super::{scheduler::ToHash, AggregateTimestamp, ReportLock, SerializedSize, TlsEvent}; diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs index 2f680727..7ee087a1 100644 --- a/crates/smtp/src/scripts/event_loop.rs +++ b/crates/smtp/src/scripts/event_loop.rs @@ -123,6 +123,7 @@ impl SMTP { PluginContext { span: &span, core: &self.core, + cache: &self.inner.script_cache, message: instance.message(), modifications: &mut modifications, arguments, diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index a5e4b428..ffde0305 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -10,7 +10,7 @@ nlp = { path = "../nlp" } rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] } foundationdb = { version = "0.9.0", features = ["embedded-fdb-include", "fdb-7_1"], optional = true } rusqlite = { version = "0.31.0", features = ["bundled"], optional = true } -rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true } +rust-s3 = { version = "0.34.0", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true } tokio = { version = "1.23", features = ["sync", "fs", "io-util"] } r2d2 = { version = "0.8.10", optional = true } futures = { version = "0.3", optional = true } @@ -28,7 +28,7 @@ num_cpus = { version = "1.15.0", optional = true } blake3 = "1.3.3" tracing = "0.1" lz4_flex = { version = "0.11", default-features = false } -deadpool-postgres = { version = "0.12.1", optional = true } +deadpool-postgres = { version = "0.14", optional = true } tokio-postgres = { version = "0.7.10", optional = true } tokio-rustls = { version = "0.25.0", optional = true } rustls = { version = "0.22.0", optional = true } @@ -42,7 +42,7 @@ regex = "1.7.0" flate2 = "1.0" async-trait = "0.1.68" redis = { version = "0.25.2", features = [ "tokio-comp", "tokio-rustls-comp", "tls-rustls-insecure", "tls-rustls-webpki-roots", "cluster-async"], optional = true } -deadpool = { version = "0.10.0", features = ["managed"], optional = true } +deadpool = { version = "0.12", features = ["managed"], optional = true } bincode = "1.3.3" arc-swap = "1.6.0" bitpacking = "0.9.2" diff --git a/crates/store/src/backend/redis/pool.rs b/crates/store/src/backend/redis/pool.rs index ca904e31..041a8801 100644 --- a/crates/store/src/backend/redis/pool.rs +++ b/crates/store/src/backend/redis/pool.rs @@ -21,7 +21,6 @@ * for more details. */ -use async_trait::async_trait; use deadpool::managed; use redis::{ aio::{ConnectionLike, MultiplexedConnection}, @@ -30,7 +29,6 @@ use redis::{ use super::{RedisClusterConnectionManager, RedisConnectionManager}; -#[async_trait] impl managed::Manager for RedisConnectionManager { type Type = MultiplexedConnection; type Error = crate::Error; @@ -58,7 +56,6 @@ impl managed::Manager for RedisConnectionManager { } } -#[async_trait] impl managed::Manager for RedisClusterConnectionManager { type Type = ClusterConnection; type Error = crate::Error; diff --git a/crates/store/src/backend/s3/mod.rs b/crates/store/src/backend/s3/mod.rs index 28dd07d9..b677559c 100644 --- a/crates/store/src/backend/s3/mod.rs +++ b/crates/store/src/backend/s3/mod.rs @@ -80,7 +80,11 @@ impl S3Store { }) .ok()? .with_path_style() - .with_request_timeout(timeout), + .with_request_timeout(timeout) + .map_err(|err| { + config.new_build_error(prefix.as_str(), format!("Failed to create bucket: {err:?}")) + }) + .ok()?, prefix: config.value((&prefix, "key-prefix")).map(|s| s.to_string()), }) } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index dbe7350c..45120716 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -25,7 +25,7 @@ jmap_proto = { path = "../crates/jmap-proto" } imap = { path = "../crates/imap", features = ["test_mode"] } imap_proto = { path = "../crates/imap-proto" } pop3 = { path = "../crates/pop3", features = ["test_mode"] } -smtp = { path = "../crates/smtp", features = ["test_mode", "local_delivery"] } +smtp = { path = "../crates/smtp", features = ["test_mode"] } common = { path = "../crates/common", features = ["test_mode"] } managesieve = { path = "../crates/managesieve", features = ["test_mode"] } smtp-proto = { version = "0.1" } @@ -61,6 +61,7 @@ serial_test = "3.0.0" num_cpus = "1.15.0" async-trait = "0.1.68" chrono = "0.4" +ring = { version = "0.17" } [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.5.0" diff --git a/tests/src/imap/mod.rs b/tests/src/imap/mod.rs index 0ca863e0..c8316322 100644 --- a/tests/src/imap/mod.rs +++ b/tests/src/imap/mod.rs @@ -45,7 +45,8 @@ use std::{ use ::managesieve::core::ManageSieveSessionManager; use common::{ config::server::{ServerProtocol, Servers}, - Core, + webhooks::manager::spawn_webhook_manager, + Core, Ipc, IPC_CHANNEL_BUFFER, }; use ::store::Stores; @@ -53,7 +54,7 @@ use ahash::AHashSet; use directory::backend::internal::manage::ManageDirectory; use imap::core::{ImapSessionManager, Inner, IMAP}; use imap_proto::ResponseType; -use jmap::{api::JmapSessionManager, services::IPC_CHANNEL_BUFFER, JMAP}; +use jmap::{api::JmapSessionManager, JMAP}; use pop3::Pop3SessionManager; use smtp::core::{SmtpSessionManager, SMTP}; use tokio::{ @@ -321,9 +322,18 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest { // Parse acceptors servers.parse_tcp_acceptors(&mut config, shared_core.clone()); - // Init servers + // Spawn webhook manager + let webhook_tx = spawn_webhook_manager(shared_core.clone()); + + // Setup IPC channels let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER); - let smtp = SMTP::init(&mut config, shared_core.clone(), delivery_tx).await; + let ipc = Ipc { + delivery_tx, + webhook_tx, + }; + + // Init servers + let smtp = SMTP::init(&mut config, shared_core.clone(), ipc).await; let jmap = JMAP::init( &mut config, delivery_rx, @@ -369,6 +379,7 @@ async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest { ), }; }); + // Create tables and test accounts let lookup = DirectoryStore { store: shared_core diff --git a/tests/src/jmap/auth_limits.rs b/tests/src/jmap/auth_limits.rs index 70d2b1f9..977414b6 100644 --- a/tests/src/jmap/auth_limits.rs +++ b/tests/src/jmap/auth_limits.rs @@ -71,6 +71,16 @@ pub async fn test(params: &mut JMAPTest) { // Reset rate limiters server.inner.concurrency_limiter.clear(); + params.webhook.clear(); + + // Incorrect passwords should be rejected with a 401 error + assert!(matches!( + Client::new() + .credentials(Credentials::basic("jdoe@example.com", "abcde")) + .accept_invalid_certs(true) + .connect("https://127.0.0.1:8899") + .await, + Err(jmap_client::Error::Problem(err)) if err.status() == Some(401))); // Wait until the beginning of the 5 seconds bucket const LIMIT: u64 = 5; @@ -79,15 +89,6 @@ pub async fn test(params: &mut JMAPTest) { let range_end = (range_start * LIMIT) + LIMIT; tokio::time::sleep(Duration::from_secs(range_end - now)).await; - // Incorrect passwords should be rejected with a 401 error - assert!(matches!( - Client::new() - .credentials(Credentials::basic("jdoe@example.com", "abcde")) - .accept_invalid_certs(true) - .connect("https://127.0.0.1:8899") - .await, - Err(jmap_client::Error::Problem(err)) if err.status() == Some(401))); - // Invalid authentication requests should be rate limited let mut n_401 = 0; let mut n_429 = 0; @@ -280,4 +281,13 @@ pub async fn test(params: &mut JMAPTest) { params.client.set_default_account_id(&account_id); destroy_all_mailboxes(params).await; assert_is_empty(server).await; + + // Check webhook events + params.webhook.assert_contains(&[ + "auth.failure", + "auth.success", + "auth.banned", + "\"login\": \"jdoe@example.com\"", + "\"accountType\": \"individual\"", + ]); } diff --git a/tests/src/jmap/delivery.rs b/tests/src/jmap/delivery.rs index 552def56..3cb267e0 100644 --- a/tests/src/jmap/delivery.rs +++ b/tests/src/jmap/delivery.rs @@ -104,6 +104,7 @@ pub async fn test(params: &mut JMAPTest) { // Delivering to individuals let mut lmtp = SmtpConnection::connect().await; + params.webhook.clear(); lmtp.ingest( "bill@example.com", @@ -320,6 +321,17 @@ pub async fn test(params: &mut JMAPTest) { destroy_all_mailboxes(params).await; } assert_is_empty(server).await; + + // Check webhook events + params.webhook.assert_contains(&[ + "message.accepted", + "message.appended", + "dsn", + "\"returnPath\": \"bill@example.com\"", + "\"sender\": \"bill@example.com\"", + "\"address\": \"john.doe@example.com\"", + "\"type\": \"success\"", + ]); } pub struct SmtpConnection { diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index 952ad976..f406c6ad 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -30,15 +30,12 @@ use base64::{ use common::{ config::server::{ServerProtocol, Servers}, manager::config::{ConfigManager, Patterns}, - Core, + webhooks::manager::spawn_webhook_manager, + Core, Ipc, IPC_CHANNEL_BUFFER, }; use hyper::{header::AUTHORIZATION, Method}; use imap::core::{ImapSessionManager, IMAP}; -use jmap::{ - api::JmapSessionManager, - services::{housekeeper::Event, IPC_CHANNEL_BUFFER}, - JMAP, -}; +use jmap::{api::JmapSessionManager, services::housekeeper::Event, JMAP}; use jmap_client::client::{Client, Credentials}; use jmap_proto::{error::request::RequestError, types::id::Id}; use managesieve::core::ManageSieveSessionManager; @@ -54,6 +51,7 @@ use store::{ }; use tokio::sync::{mpsc, watch}; use utils::config::Config; +use webhooks::{spawn_mock_webhook_endpoint, MockWebhookEndpoint}; use crate::{add_test_certs, directory::DirectoryStore, store::TempDir, AssertConfig}; @@ -82,6 +80,7 @@ pub mod stress_test; pub mod thread_get; pub mod thread_merge; pub mod vacation_response; +pub mod webhooks; pub mod websocket; const SERVER: &str = r#" @@ -287,6 +286,15 @@ refresh-token-renew = "2s" expn = true vrfy = true +[webhook."test"] +url = "http://127.0.0.1:8821/hook" +events = ["auth.success", "auth.failure", "auth.banned", "auth.error", + "message.accepted", "message.rejected", "message.appended", + "account.over-quota", "dsn", "double-bounce", "report.incoming.dmarc", + "report.incoming.tls", "report.incoming.arf", "report.outgoing"] +signature-key = "ovos-moles" +throttle = "100ms" + "#; #[tokio::test(flavor = "multi_thread")] @@ -314,6 +322,7 @@ pub async fn jmap_tests() { ) .await; + webhooks::test(&mut params).await; email_query::test(&mut params, delete).await; email_get::test(&mut params).await; email_set::test(&mut params).await; @@ -379,6 +388,7 @@ pub struct JMAPTest { client: Client, directory: DirectoryStore, temp_dir: TempDir, + webhook: Arc, shutdown_tx: watch::Sender, } @@ -485,9 +495,18 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest { // Parse acceptors servers.parse_tcp_acceptors(&mut config, shared_core.clone()); - // Init servers + // Spawn webhook manager + let webhook_tx = spawn_webhook_manager(shared_core.clone()); + + // Setup IPC channels let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER); - let smtp = SMTP::init(&mut config, shared_core.clone(), delivery_tx).await; + let ipc = Ipc { + delivery_tx, + webhook_tx, + }; + + // Init servers + let smtp = SMTP::init(&mut config, shared_core.clone(), ipc).await; let jmap = JMAP::init( &mut config, delivery_rx, @@ -569,6 +588,7 @@ async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest { client, directory, shutdown_tx, + webhook: spawn_mock_webhook_endpoint(), } } diff --git a/tests/src/jmap/thread_merge.rs b/tests/src/jmap/thread_merge.rs index 798cae31..b8f06e4a 100644 --- a/tests/src/jmap/thread_merge.rs +++ b/tests/src/jmap/thread_merge.rs @@ -27,7 +27,10 @@ use crate::{ jmap::{assert_is_empty, mailbox::destroy_all_mailboxes}, store::deflate_test_resource, }; -use jmap::{email::ingest::IngestEmail, IngestError}; +use jmap::{ + email::ingest::{IngestEmail, IngestSource}, + IngestError, +}; use jmap_client::{email, mailbox::Role}; use jmap_proto::types::{collection::Collection, id::Id}; use mail_parser::{mailbox::mbox::MessageIterator, MessageParser}; @@ -264,7 +267,7 @@ async fn test_multi_thread(params: &mut JMAPTest) { mailbox_ids: vec![mailbox_id], keywords: vec![], received_at: None, - skip_duplicates: true, + source: IngestSource::Smtp, encrypt: false, }) .await diff --git a/tests/src/jmap/webhooks.rs b/tests/src/jmap/webhooks.rs new file mode 100644 index 00000000..b55f0b1a --- /dev/null +++ b/tests/src/jmap/webhooks.rs @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use base64::{engine::general_purpose::STANDARD, Engine}; +use common::{ + manager::webadmin::Resource, + webhooks::{WebhookEvent, WebhookEvents}, +}; +use hyper::{body, server::conn::http1, service::service_fn}; +use hyper_util::rt::TokioIo; +use jmap::api::http::{fetch_body, ToHttpResponse}; +use jmap_proto::error::request::RequestError; +use ring::hmac; +use store::parking_lot::Mutex; +use tokio::{net::TcpListener, sync::watch}; + +use super::JMAPTest; + +pub struct MockWebhookEndpoint { + pub tx: watch::Sender, + pub events: Mutex>, + pub reject: AtomicBool, +} + +pub async fn test(params: &mut JMAPTest) { + println!("Running Webhook tests..."); + + // Webhooks endpoint starts disabled by default, make sure there are no events. + tokio::time::sleep(Duration::from_millis(200)).await; + params.webhook.assert_is_empty(); + + // Enable the endpoint + params.webhook.accept(); + tokio::time::sleep(Duration::from_millis(1000)).await; + + // Check for events + params.webhook.assert_contains(&["auth.success"]); +} + +impl MockWebhookEndpoint { + pub fn assert_contains(&self, expected: &[&str]) { + let events = + serde_json::to_string_pretty(&self.events.lock().drain(..).collect::>()) + .unwrap(); + + for string in expected { + if !events.contains(string) { + panic!( + "Expected events to contain '{}', but it did not. Events: {}", + string, events + ); + } + } + } + + pub fn accept(&self) { + self.reject.store(false, Ordering::Relaxed); + } + + pub fn reject(&self) { + self.reject.store(true, Ordering::Relaxed); + } + + pub fn clear(&self) { + self.events.lock().clear(); + } + + pub fn assert_is_empty(&self) { + assert!(self.events.lock().is_empty()); + } +} + +pub fn spawn_mock_webhook_endpoint() -> Arc { + let (tx, rx) = watch::channel(true); + let endpoint_ = Arc::new(MockWebhookEndpoint { + tx, + events: Mutex::new(vec![]), + reject: true.into(), + }); + + let endpoint = endpoint_.clone(); + + tokio::spawn(async move { + let listener = TcpListener::bind("127.0.0.1:8821") + .await + .unwrap_or_else(|e| { + panic!("Failed to bind mock Milter server to 127.0.0.1:8821: {e}"); + }); + let mut rx_ = rx.clone(); + + loop { + tokio::select! { + stream = listener.accept() => { + match stream { + Ok((stream, _)) => { + + let _ = http1::Builder::new() + .keep_alive(false) + .serve_connection( + TokioIo::new(stream), + service_fn(|mut req: hyper::Request| { + let endpoint = endpoint.clone(); + + async move { + // Verify HMAC signature + let key = hmac::Key::new(hmac::HMAC_SHA256, "ovos-moles".as_bytes()); + let body = fetch_body(&mut req, 1024 * 1024).await.unwrap(); + let tag = STANDARD.decode(req.headers().get("X-Signature").unwrap().to_str().unwrap()).unwrap(); + hmac::verify(&key, &body, &tag).expect("Invalid signature"); + + // Deserialize JSON + let request = serde_json::from_slice::(&body) + .expect("Failed to parse JSON"); + + if !endpoint.reject.load(Ordering::Relaxed) { + //let c = print!("received webhook: {}", serde_json::to_string_pretty(&request).unwrap()); + + // Add events + endpoint.events.lock().extend(request.events); + + Ok::<_, hyper::Error>( + Resource { + content_type: "application/json", + contents: "[]".to_string().into_bytes(), + } + .into_http_response(), + ) + } else { + //let c = print!("rejected webhook: {}", serde_json::to_string_pretty(&request).unwrap()); + + Ok::<_, hyper::Error>( + RequestError::not_found().into_http_response() + ) + } + + } + }), + ) + .await; + } + Err(err) => { + panic!("Something went wrong: {err}" ); + } + } + }, + _ = rx_.changed() => { + //println!("Mock jMilter server stopping"); + break; + } + }; + } + }); + + endpoint_ +}