RocksDB backend re-implementation

This commit is contained in:
mdecimus 2023-12-02 15:52:05 +01:00
parent 4a00bbb79e
commit 5010c15037
27 changed files with 1185 additions and 1025 deletions

282
Cargo.lock generated
View file

@ -422,12 +422,6 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.5"
@ -475,26 +469,6 @@ dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.64.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4"
dependencies = [
"bitflags 1.3.2",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"peeking_take_while",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 1.0.109",
]
[[package]]
name = "bindgen"
version = "0.65.1"
@ -518,6 +492,26 @@ dependencies = [
"which",
]
[[package]]
name = "bindgen"
version = "0.69.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ffcebc3849946a7170a05992aac39da343a90676ab392c51a4280981d6379c2"
dependencies = [
"bitflags 2.4.1",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"peeking_take_while",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 2.0.39",
]
[[package]]
name = "bit-set"
version = "0.5.3"
@ -886,9 +880,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.4.8"
version = "4.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2275f18819641850fa26c89acc84d465c1bf91ce57bc2748b28c420473352f64"
checksum = "41fffed7514f420abec6d183b1d3acfd9099c79c3a10a06ade4f8203f1411272"
dependencies = [
"clap_builder",
"clap_derive",
@ -896,9 +890,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.4.8"
version = "4.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07cdf1b148b25c1e1f7a42225e30a0d99a615cd4637eae7365548dd4529b95bc"
checksum = "63361bae7eef3771745f02d8d892bec2fee5f6e34af316ba556e7f97a7069ff1"
dependencies = [
"anstream",
"anstyle",
@ -989,9 +983,9 @@ checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2"
[[package]]
name = "core-foundation"
version = "0.9.3"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146"
checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f"
dependencies = [
"core-foundation-sys",
"libc",
@ -999,9 +993,9 @@ dependencies = [
[[package]]
name = "core-foundation-sys"
version = "0.8.4"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
[[package]]
name = "cpufeatures"
@ -1672,12 +1666,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.7"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
dependencies = [
"libc",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@ -2236,7 +2230,7 @@ dependencies = [
"rand",
"ring 0.16.20",
"rustls 0.21.9",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"thiserror",
"tinyvec",
"tokio",
@ -2560,7 +2554,7 @@ dependencies = [
"parking_lot",
"rand",
"rustls 0.21.9",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"store",
"tokio",
"tokio-rustls",
@ -2829,9 +2823,9 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.65"
version = "0.3.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8"
checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca"
dependencies = [
"wasm-bindgen",
]
@ -2847,9 +2841,9 @@ dependencies = [
[[package]]
name = "konst"
version = "0.3.6"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "030400e39b2dff8beaa55986a17e0014ad657f569ca92426aafcb5e8e71faee7"
checksum = "29a6ee015a18f4a121ba670f947f801b207139f0f810b3cd266e319065129782"
dependencies = [
"const_panic",
"konst_kernel",
@ -2980,11 +2974,11 @@ dependencies = [
[[package]]
name = "librocksdb-sys"
version = "0.10.0+7.9.2"
version = "0.11.0+8.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fe4d5874f5ff2bc616e55e8c6086d478fcda13faf9495768a4aa1c22042d30b"
checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e"
dependencies = [
"bindgen 0.64.0",
"bindgen 0.65.1",
"bzip2-sys",
"cc",
"glob",
@ -3024,9 +3018,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linux-raw-sys"
version = "0.4.11"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829"
checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456"
[[package]]
name = "lock_api"
@ -3094,8 +3088,8 @@ dependencies = [
"mail-parser",
"parking_lot",
"quick-xml 0.30.0",
"ring 0.17.5",
"rustls-pemfile",
"ring 0.17.6",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"zip",
@ -3120,17 +3114,17 @@ dependencies = [
[[package]]
name = "mail-send"
version = "0.4.1"
source = "git+https://github.com/stalwartlabs/mail-send#53904ce6cf4fcb9a42a92a541f5d64d4d972d6cb"
version = "0.4.2"
source = "git+https://github.com/stalwartlabs/mail-send#09981bceec74b2da9522c3aaadcd675e612d1653"
dependencies = [
"base64 0.20.0",
"base64 0.21.5",
"gethostname",
"md5",
"rustls 0.21.9",
"smtp-proto",
"tokio",
"tokio-rustls",
"webpki-roots 0.25.3",
"webpki-roots 0.26.0",
]
[[package]]
@ -3166,7 +3160,7 @@ dependencies = [
"md5",
"parking_lot",
"rustls 0.21.9",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"sieve-rs",
"store",
"tokio",
@ -3354,7 +3348,7 @@ dependencies = [
"pin-project",
"rand",
"rustls 0.21.9",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"socket2 0.5.5",
@ -3376,7 +3370,7 @@ checksum = "06f19e4cfa0ab5a76b627cec2d81331c49b034988eaf302c3bafeada684eadef"
dependencies = [
"base64 0.21.5",
"bigdecimal",
"bindgen 0.65.1",
"bindgen 0.69.1",
"bitflags 2.4.1",
"bitvec",
"btoi",
@ -4494,7 +4488,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"rustls 0.21.9",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"serde_urlencoded",
@ -4555,9 +4549,9 @@ dependencies = [
[[package]]
name = "ring"
version = "0.17.5"
version = "0.17.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b"
checksum = "684d5e6e18f669ccebf64a92236bb7db9a34f07be010e3627368182027180866"
dependencies = [
"cc",
"getrandom",
@ -4617,9 +4611,9 @@ dependencies = [
[[package]]
name = "rocksdb"
version = "0.20.1"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "015439787fce1e75d55f279078d33ff14b4af5d93d995e8838ee4631301c8a99"
checksum = "bb6f170a4041d50a0ce04b0d2e14916d6ca863ea2e422689a5b694395d299ffe"
dependencies = [
"libc",
"librocksdb-sys",
@ -4638,9 +4632,9 @@ dependencies = [
[[package]]
name = "rsa"
version = "0.9.4"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a3211b01eea83d80687da9eef70e39d65144a3894866a5153a2723e425a157f"
checksum = "af6c4b23d99685a1408194da11270ef8e9809aff951cc70ec9b17350b087e474"
dependencies = [
"const-oid",
"digest 0.10.7",
@ -4780,15 +4774,15 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.25"
version = "0.38.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc99bc2d4f1fed22595588a013687477aedf3cdcfb26558c559edb67b4d9b22e"
checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a"
dependencies = [
"bitflags 2.4.1",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@ -4810,7 +4804,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
dependencies = [
"log",
"ring 0.17.5",
"ring 0.17.6",
"rustls-webpki",
"sct",
]
@ -4822,7 +4816,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"schannel",
"security-framework",
]
@ -4836,13 +4830,29 @@ dependencies = [
"base64 0.21.5",
]
[[package]]
name = "rustls-pemfile"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4"
dependencies = [
"base64 0.21.5",
"rustls-pki-types",
]
[[package]]
name = "rustls-pki-types"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb0a1f9b9efec70d32e6d6aa3e58ebd88c3754ec98dfe9145c63cf54cc829b83"
[[package]]
name = "rustls-webpki"
version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
"ring 0.17.5",
"ring 0.17.6",
"untrusted 0.9.0",
]
@ -4915,7 +4925,7 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
"ring 0.17.5",
"ring 0.17.6",
"untrusted 0.9.0",
]
@ -5276,7 +5286,7 @@ dependencies = [
"regex",
"reqwest",
"rustls 0.21.9",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"sha1",
@ -5359,9 +5369,9 @@ dependencies = [
[[package]]
name = "spki"
version = "0.7.2"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a"
checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d"
dependencies = [
"base64ct",
"der",
@ -5420,7 +5430,7 @@ dependencies = [
"paste",
"percent-encoding",
"rustls 0.21.9",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"sha2 0.10.8",
@ -5629,7 +5639,6 @@ name = "store"
version = "0.1.0"
dependencies = [
"ahash 0.8.6",
"async-trait",
"blake3",
"deadpool-postgres",
"farmhash",
@ -5645,7 +5654,7 @@ dependencies = [
"r2d2",
"rand",
"rayon",
"ring 0.17.5",
"ring 0.17.6",
"roaring",
"rocksdb",
"rusqlite",
@ -5862,7 +5871,7 @@ dependencies = [
"rayon",
"reqwest",
"rustls 0.21.9",
"rustls-pemfile",
"rustls-pemfile 2.0.0",
"serde",
"serde_json",
"serial_test",
@ -6488,7 +6497,7 @@ dependencies = [
"privdrop",
"rand",
"rustls 0.21.9",
"rustls-pemfile",
"rustls-pemfile 2.0.0",
"serde",
"smtp-proto",
"tokio",
@ -6498,7 +6507,7 @@ dependencies = [
"tracing-journald",
"tracing-opentelemetry",
"tracing-subscriber",
"webpki-roots 0.25.3",
"webpki-roots 0.26.0",
]
[[package]]
@ -6545,9 +6554,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.88"
version = "0.2.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce"
checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@ -6555,9 +6564,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.88"
version = "0.2.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217"
checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826"
dependencies = [
"bumpalo",
"log",
@ -6570,9 +6579,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.38"
version = "0.4.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9afec9963e3d0994cac82455b2b3502b81a7f40f9a0d32181f7528d9f4b43e02"
checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12"
dependencies = [
"cfg-if",
"js-sys",
@ -6582,9 +6591,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.88"
version = "0.2.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2"
checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@ -6592,9 +6601,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.88"
version = "0.2.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907"
checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283"
dependencies = [
"proc-macro2",
"quote",
@ -6605,9 +6614,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.88"
version = "0.2.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b"
checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f"
[[package]]
name = "wasm-streams"
@ -6624,9 +6633,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.65"
version = "0.3.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85"
checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f"
dependencies = [
"js-sys",
"wasm-bindgen",
@ -6648,7 +6657,7 @@ version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53"
dependencies = [
"ring 0.17.5",
"ring 0.17.6",
"untrusted 0.9.0",
]
@ -6676,6 +6685,15 @@ version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10"
[[package]]
name = "webpki-roots"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de2cfda980f21be5a7ed2eadb3e6fe074d56022bea2cdeb1a62eb220fc04188"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "whatlang"
version = "0.16.3"
@ -6772,6 +6790,15 @@ dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.0",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
@ -6802,6 +6829,21 @@ dependencies = [
"windows_x86_64_msvc 0.48.5",
]
[[package]]
name = "windows-targets"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd"
dependencies = [
"windows_aarch64_gnullvm 0.52.0",
"windows_aarch64_msvc 0.52.0",
"windows_i686_gnu 0.52.0",
"windows_i686_msvc 0.52.0",
"windows_x86_64_gnu 0.52.0",
"windows_x86_64_gnullvm 0.52.0",
"windows_x86_64_msvc 0.52.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
@ -6814,6 +6856,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
@ -6826,6 +6874,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
@ -6838,6 +6892,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
@ -6850,6 +6910,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
@ -6862,6 +6928,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
@ -6874,6 +6946,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
@ -6886,6 +6964,12 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
[[package]]
name = "winnow"
version = "0.5.19"
@ -6965,18 +7049,18 @@ checksum = "9828b178da53440fa9c766a3d2f73f7cf5d0ac1fe3980c1e5018d899fd19e07b"
[[package]]
name = "zerocopy"
version = "0.7.26"
version = "0.7.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e97e415490559a91254a2979b4829267a57d2fcd741a98eee8b722fb57289aa0"
checksum = "7d6f15f7ade05d2a4935e34a457b936c23dc70a05cc1d97133dc99e7a3fe0f0e"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.26"
version = "0.7.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f"
checksum = "dbbad221e3f78500350ecbd7dfa4e63ef945c05f4c61cb7f4d3f84cd0bba649b"
dependencies = [
"proc-macro2",
"quote",

View file

@ -48,10 +48,7 @@ use services::{
};
use smtp::core::SMTP;
use store::{
backend::{
foundationdb::FdbStore, fs::FsStore, mysql::MysqlStore, postgres::PostgresStore,
sqlite::SqliteStore,
},
backend::rocksdb::RocksDbStore,
fts::FtsFilter,
parking_lot::Mutex,
query::{sort::Pagination, Comparator, Filter, ResultSet, SortedResultSet},
@ -203,21 +200,26 @@ impl JMAP {
PostgresStore::open(config)
.await
.failed("Unable to open database"),
));
let store = Store::SQLite(Arc::new(
));*/
/*let store = Store::SQLite(Arc::new(
SqliteStore::open(config)
.await
.failed("Unable to open database"),
));
let store = Store::FoundationDb(Arc::new(
));*/
/*let store = Store::FoundationDb(Arc::new(
FdbStore::open(config)
.await
.failed("Unable to open database"),
));*/
let store = Store::MySQL(Arc::new(
/*let store = Store::MySQL(Arc::new(
MysqlStore::open(config)
.await
.failed("Unable to open database"),
));*/
let store = Store::RocksDb(Arc::new(
RocksDbStore::open(config)
.await
.failed("Unable to open database"),
));
let blob_store = store.clone().into();
/*let blob_store = BlobStore::Fs(Arc::new(

View file

@ -31,7 +31,10 @@ tracing = "0.1"
jemallocator = "0.5.0"
[features]
default = ["sqlite", "foundationdb"]
#default = ["sqlite", "foundationdb", "postgres", "mysql"]
default = ["rocks"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]
mysql = ["store/mysql"]
rocks = ["store/rocks"]

View file

@ -7,7 +7,7 @@ resolver = "2"
[dependencies]
utils = { path = "../utils" }
nlp = { path = "../nlp" }
rocksdb = { version = "0.20.1", optional = true }
rocksdb = { version = "0.21", optional = true, features = ["multi-threaded-cf"] }
foundationdb = { version = "0.8.0", features = ["embedded-fdb-include"], optional = true }
rusqlite = { version = "0.29.0", features = ["bundled"], optional = true }
rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"] }
@ -27,23 +27,24 @@ lru-cache = { version = "0.1.2", optional = true }
num_cpus = { version = "1.15.0", optional = true }
blake3 = "1.3.3"
tracing = "0.1"
async-trait = "0.1.68"
lz4_flex = { version = "0.11" }
deadpool-postgres = "0.11.0"
tokio-postgres = "0.7.10"
tokio-rustls = { version = "0.24.0"}
rustls = "0.21.0"
ring = "0.17"
mysql_async = { version = "*", default-features = false, features = ["default-rustls"] }
deadpool-postgres = { version = "0.11.0", optional = true }
tokio-postgres = { version = "0.7.10", optional = true }
tokio-rustls = { version = "0.24.0", optional = true }
rustls = { version = "0.21.0", optional = true }
ring = { version = "0.17", optional = true }
mysql_async = { version = "0.33", default-features = false, features = ["default-rustls"], optional = true }
[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }
[features]
rocks = ["rocksdb", "rayon"]
rocks = ["rocksdb", "rayon", "num_cpus"]
sqlite = ["rusqlite", "rayon", "r2d2", "num_cpus", "lru-cache"]
postgres = ["tokio-postgres", "deadpool-postgres", "tokio-rustls", "rustls", "ring", "futures"]
mysql = ["mysql_async"]
foundation = ["foundationdb", "futures"]
backend = []
test_mode = []

View file

@ -23,15 +23,18 @@
#[cfg(feature = "foundation")]
pub mod foundationdb;
pub mod fs;
#[cfg(feature = "mysql")]
pub mod mysql;
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "rocks")]
pub mod rocksdb;
pub mod s3;
#[cfg(feature = "sqlite")]
pub mod sqlite;
pub mod fs;
pub mod s3;
pub const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 1) as usize;
pub const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1;

View file

@ -27,7 +27,6 @@ pub mod blob;
pub mod main;
pub mod purge;
pub mod read;
pub mod tls;
pub mod write;
pub struct MysqlStore {

View file

@ -1,152 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
// Credits: https://github.com/jbg/tokio-postgres-rustls
use std::{
convert::TryFrom,
future::Future,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::future::{FutureExt, TryFutureExt};
use ring::digest;
use rustls::{ClientConfig, ServerName};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_postgres::tls::{ChannelBinding, MakeTlsConnect, TlsConnect};
use tokio_rustls::{client::TlsStream, TlsConnector};
#[derive(Clone)]
pub struct MakeRustlsConnect {
config: Arc<ClientConfig>,
}
impl MakeRustlsConnect {
pub fn new(config: ClientConfig) -> Self {
Self {
config: Arc::new(config),
}
}
}
impl<S> MakeTlsConnect<S> for MakeRustlsConnect
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Stream = RustlsStream<S>;
type TlsConnect = RustlsConnect;
type Error = io::Error;
fn make_tls_connect(&mut self, hostname: &str) -> io::Result<RustlsConnect> {
ServerName::try_from(hostname)
.map(|dns_name| {
RustlsConnect(Some(RustlsConnectData {
hostname: dns_name,
connector: Arc::clone(&self.config).into(),
}))
})
.or(Ok(RustlsConnect(None)))
}
}
pub struct RustlsConnect(Option<RustlsConnectData>);
struct RustlsConnectData {
hostname: ServerName,
connector: TlsConnector,
}
impl<S> TlsConnect<S> for RustlsConnect
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Stream = RustlsStream<S>;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = io::Result<RustlsStream<S>>> + Send>>;
fn connect(self, stream: S) -> Self::Future {
match self.0 {
None => Box::pin(core::future::ready(Err(io::ErrorKind::InvalidInput.into()))),
Some(c) => c
.connector
.connect(c.hostname, stream)
.map_ok(|s| RustlsStream(Box::pin(s)))
.boxed(),
}
}
}
pub struct RustlsStream<S>(Pin<Box<TlsStream<S>>>);
impl<S> tokio_postgres::tls::TlsStream for RustlsStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
fn channel_binding(&self) -> ChannelBinding {
let (_, session) = self.0.get_ref();
match session.peer_certificates() {
Some(certs) if !certs.is_empty() => {
let sha256 = digest::digest(&digest::SHA256, certs[0].as_ref());
ChannelBinding::tls_server_end_point(sha256.as_ref().into())
}
_ => ChannelBinding::none(),
}
}
}
impl<S> AsyncRead for RustlsStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
self.0.as_mut().poll_read(cx, buf)
}
}
impl<S> AsyncWrite for RustlsStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<tokio::io::Result<usize>> {
self.0.as_mut().poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<()>> {
self.0.as_mut().poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<()>> {
self.0.as_mut().poll_shutdown(cx)
}
}

View file

@ -25,6 +25,8 @@ use crate::{Deserialize, Serialize};
use roaring::RoaringBitmap;
use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
use crate::U32_LEN;
pub const BIT_SET: u8 = 0x80;
pub const BIT_CLEAR: u8 = 0;
@ -61,15 +63,22 @@ pub fn deserialize_bitmap(bytes: &[u8]) -> Option<RoaringBitmap> {
}
impl Deserialize for RoaringBitmap {
fn deserialize(bytes: &[u8]) -> Option<Self> {
match *bytes.first()? {
IS_BITMAP => deserialize_bitmap(bytes),
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
match *bytes
.first()
.ok_or_else(|| crate::Error::InternalError("Empty bitmap".to_string()))?
{
IS_BITMAP => deserialize_bitmap(bytes).ok_or_else(|| {
crate::Error::InternalError("Failed to deserialize bitmap".to_string())
}),
IS_BITLIST => {
let mut bm = RoaringBitmap::new();
deserialize_bitlist(&mut bm, bytes);
Some(bm)
Ok(bm)
}
_ => None,
_ => Err(crate::Error::InternalError(
"Invalid bitmap type".to_string(),
)),
}
}
}
@ -201,7 +210,7 @@ pub fn bitmap_merge<'x>(
operands: impl IntoIterator<Item = &'x [u8]>,
) -> Option<Vec<u8>> {
let mut bm = match existing_val {
Some(existing_val) => RoaringBitmap::deserialize(existing_val)?,
Some(existing_val) => RoaringBitmap::deserialize(existing_val).ok()?,
None if operands_len == 1 => {
return Some(Vec::from(operands.into_iter().next().unwrap()));
}

View file

@ -0,0 +1,75 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::ops::Range;
use super::{RocksDbStore, CF_BLOB_DATA};
impl RocksDbStore {
pub(crate) async fn get_blob(
&self,
key: &[u8],
range: Range<u32>,
) -> crate::Result<Option<Vec<u8>>> {
let db = self.db.clone();
self.spawn_worker(move || {
db.get_pinned_cf(&db.cf_handle(CF_BLOB_DATA).unwrap(), key)
.map(|obj| {
obj.map(|bytes| {
if range.start == 0 && range.end == u32::MAX {
bytes.to_vec()
} else {
bytes
.get(
range.start as usize
..std::cmp::min(bytes.len(), range.end as usize),
)
.unwrap_or_default()
.to_vec()
}
})
})
.map_err(|e| crate::Error::InternalError(format!("Failed to fetch blob: {}", e)))
})
.await
}
pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> {
let db = self.db.clone();
self.spawn_worker(move || {
db.put_cf(&db.cf_handle(CF_BLOB_DATA).unwrap(), key, data)
.map_err(|e| crate::Error::InternalError(format!("Failed to insert blob: {}", e)))
})
.await
}
pub(crate) async fn delete_blob(&self, key: &[u8]) -> crate::Result<bool> {
let db = self.db.clone();
self.spawn_worker(move || {
db.delete_cf(&db.cf_handle(CF_BLOB_DATA).unwrap(), key)
.map_err(|e| crate::Error::InternalError(format!("Failed to delete blob: {}", e)))
.map(|_| true)
})
.await
}
}

View file

@ -1,146 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use rocksdb::{Direction, IteratorMode};
use crate::{
query::log::{Changes, Query},
write::key::DeserializeBigEndian,
Error, LogKey, Serialize, Store,
};
use super::CF_LOGS;
const CHANGE_ID_POS: usize = U32_LEN + std::mem::size_of::<u8>();
impl Store {
pub fn get_last_change_id(
&self,
account_id: u32,
collection: impl Into<u8>,
) -> crate::Result<Option<u64>> {
let collection = collection.into();
let match_key = LogKey {
account_id,
collection,
change_id: u64::MAX,
}
.serialize();
if let Some(Ok((key, _))) = self
.db
.iterator_cf(
&self.db.cf_handle(CF_LOGS).unwrap(),
IteratorMode::From(&match_key, Direction::Reverse),
)
.next()
{
if key.starts_with(&match_key[0..CHANGE_ID_POS]) {
return Ok(Some(
key.as_ref()
.deserialize_be_u64(CHANGE_ID_POS)
.ok_or_else(|| {
Error::InternalError(format!(
"Failed to deserialize changelog key for [{}/{:?}]: [{:?}]",
account_id, collection, key
))
})?,
));
}
}
Ok(None)
}
pub fn get_changes(
&self,
account_id: u32,
collection: impl Into<u8>,
query: Query,
) -> crate::Result<Option<Changes>> {
let collection = collection.into();
let mut changelog = Changes::default();
let (is_inclusive, from_change_id, to_change_id) = match query {
Query::All => (true, 0, 0),
Query::Since(change_id) => (false, change_id, 0),
Query::SinceInclusive(change_id) => (true, change_id, 0),
Query::RangeInclusive(from_change_id, to_change_id) => {
(true, from_change_id, to_change_id)
}
};
let key = LogKey {
account_id,
collection,
change_id: from_change_id,
}
.serialize();
let prefix = &key[0..CHANGE_ID_POS];
let mut is_first = true;
for entry in self.db.iterator_cf(
&self.db.cf_handle(CF_LOGS).unwrap(),
IteratorMode::From(&key, Direction::Forward),
) {
let (key, value) = entry?;
if !key.starts_with(prefix) {
break;
}
let change_id = key
.as_ref()
.deserialize_be_u64(CHANGE_ID_POS)
.ok_or_else(|| {
Error::InternalError(format!(
"Failed to deserialize changelog key for [{}/{:?}]: [{:?}]",
account_id, collection, key
))
})?;
if change_id > from_change_id || (is_inclusive && change_id == from_change_id) {
if to_change_id > 0 && change_id > to_change_id {
break;
}
if is_first {
changelog.from_change_id = change_id;
is_first = false;
}
changelog.to_change_id = change_id;
changelog.deserialize(&value).ok_or_else(|| {
Error::InternalError(format!(
"Failed to deserialize changelog for [{}/{:?}]: [{:?}]",
account_id, collection, query
))
})?;
}
}
if is_first {
changelog.from_change_id = from_change_id;
changelog.to_change_id = if to_change_id > 0 {
to_change_id
} else {
from_change_id
};
}
Ok(Some(changelog))
}
}

View file

@ -24,22 +24,29 @@
use std::path::PathBuf;
use roaring::RoaringBitmap;
use rocksdb::{ColumnFamilyDescriptor, MergeOperands, OptimisticTransactionDB, Options};
use rocksdb::{
compaction_filter::Decision, ColumnFamilyDescriptor, MergeOperands, OptimisticTransactionDB,
Options,
};
use crate::{Deserialize, Error, Store};
use tokio::sync::oneshot;
use utils::{config::Config, UnwrapFailure};
use super::{CF_BITMAPS, CF_BLOBS, CF_INDEXES, CF_LOGS, CF_VALUES};
use crate::{Deserialize, Error};
impl Store {
pub fn open() -> crate::Result<Self> {
use super::{
RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_BLOB_DATA, CF_COUNTERS, CF_INDEXES, CF_INDEX_VALUES,
CF_LOGS, CF_VALUES,
};
impl RocksDbStore {
pub async fn open(config: &Config) -> crate::Result<Self> {
// Create the database directory if it doesn't exist
let path = PathBuf::from(
"/tmp/rocksdb_test", /*&settings
.get("db-path")
.unwrap_or_else(|| "/usr/local/stalwart-jmap/data".to_string())*/
let idx_path: PathBuf = PathBuf::from(
config
.value_require("store.db.path")
.failed("Invalid configuration file"),
);
let mut idx_path = path;
idx_path.push("idx");
std::fs::create_dir_all(&idx_path).map_err(|err| {
Error::InternalError(format!(
"Failed to create index directory {}: {:?}",
@ -48,64 +55,78 @@ impl Store {
))
})?;
let mut cfs = Vec::new();
// Bitmaps
let cf_bitmaps = {
let mut cf_opts = Options::default();
//cf_opts.set_max_write_buffer_number(16);
cf_opts.set_merge_operator("merge", bitmap_merge, bitmap_partial_merge);
cf_opts.set_compaction_filter("compact", bitmap_compact);
ColumnFamilyDescriptor::new(CF_BITMAPS, cf_opts)
};
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
cf_opts.set_merge_operator("merge", bitmap_merge, bitmap_partial_merge);
cf_opts.set_compaction_filter("compact", bitmap_compact);
cfs.push(ColumnFamilyDescriptor::new(CF_BITMAPS, cf_opts));
// Stored values
let cf_values = {
let mut cf_opts = Options::default();
cf_opts.set_merge_operator_associative("merge", numeric_value_merge);
ColumnFamilyDescriptor::new(CF_VALUES, cf_opts)
};
// Secondary indexes
let cf_indexes = {
let cf_opts = Options::default();
ColumnFamilyDescriptor::new(CF_INDEXES, cf_opts)
};
// Counters
let mut cf_opts = Options::default();
cf_opts.set_merge_operator_associative("merge", numeric_value_merge);
cfs.push(ColumnFamilyDescriptor::new(CF_COUNTERS, cf_opts));
// Blobs
let cf_blobs = {
let mut cf_opts = Options::default();
cf_opts.set_enable_blob_files(true);
cf_opts.set_min_blob_size(
16834, /*settings.parse("blob-min-size").unwrap_or(16384) */
);
ColumnFamilyDescriptor::new(CF_BLOBS, cf_opts)
};
let mut cf_opts = Options::default();
cf_opts.set_enable_blob_files(true);
cf_opts.set_min_blob_size(config.property_or_static("store.db.min-blob-size", "16834")?);
cfs.push(ColumnFamilyDescriptor::new(CF_BLOB_DATA, cf_opts));
// Raft log and change log
let cf_log = {
// Other cfs
for cf in [CF_BLOBS, CF_INDEXES, CF_INDEX_VALUES, CF_LOGS, CF_VALUES] {
let cf_opts = Options::default();
ColumnFamilyDescriptor::new(CF_LOGS, cf_opts)
};
cfs.push(ColumnFamilyDescriptor::new(cf, cf_opts));
}
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
db_opts.set_max_background_jobs(std::cmp::max(num_cpus::get() as i32, 3));
db_opts.set_write_buffer_size(
config.property_or_static("store.db.write-buffer-size", "134217728")?,
);
Ok(Store {
db: OptimisticTransactionDB::open_cf_descriptors(
&db_opts,
idx_path,
vec![cf_bitmaps, cf_values, cf_indexes, cf_blobs, cf_log],
)
.map_err(|e| Error::InternalError(e.into_string()))?,
Ok(RocksDbStore {
db: OptimisticTransactionDB::open_cf_descriptors(&db_opts, idx_path, cfs)
.map_err(|e| Error::InternalError(e.into_string()))?
.into(),
worker_pool: rayon::ThreadPoolBuilder::new()
.num_threads(
config
.property::<usize>("store.db.pool.workers")?
.filter(|v| *v > 0)
.unwrap_or_else(num_cpus::get),
)
.build()
.map_err(|err| {
crate::Error::InternalError(format!("Failed to build worker pool: {}", err))
})?,
})
}
pub fn close(&self) -> crate::Result<()> {
self.db
.flush()
.map_err(|e| Error::InternalError(e.to_string()))?;
self.db.cancel_all_background_work(true);
Ok(())
pub async fn spawn_worker<U, V>(&self, mut f: U) -> crate::Result<V>
where
U: FnMut() -> crate::Result<V> + Send,
V: Sync + Send + 'static,
{
let (tx, rx) = oneshot::channel();
self.worker_pool.scope(|s| {
s.spawn(|_| {
tx.send(f()).ok();
});
});
match rx.await {
Ok(result) => result,
Err(err) => Err(crate::Error::InternalError(format!(
"Worker thread failed: {}",
err
))),
}
}
}
@ -134,7 +155,7 @@ pub fn bitmap_merge(
existing_val: Option<&[u8]>,
operands: &MergeOperands,
) -> Option<Vec<u8>> {
super::bitmap::bitmap_merge(existing_val, operands.len(), operands.into_iter())
super::bitmap::bitmap_merge(existing_val, operands.len(), operands)
}
pub fn bitmap_partial_merge(
@ -146,13 +167,9 @@ pub fn bitmap_partial_merge(
None
}
pub fn bitmap_compact(
_level: u32,
_key: &[u8],
value: &[u8],
) -> rocksdb::compaction_filter::Decision {
pub fn bitmap_compact(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
match RoaringBitmap::deserialize(value) {
Some(bm) if bm.is_empty() => rocksdb::compaction_filter::Decision::Remove,
_ => rocksdb::compaction_filter::Decision::Keep,
Ok(bm) if bm.is_empty() => Decision::Remove,
_ => Decision::Keep,
}
}

View file

@ -21,119 +21,30 @@
* for more details.
*/
use std::sync::Arc;
use rocksdb::{MultiThreaded, OptimisticTransactionDB};
use crate::{
write::key::KeySerializer, AclKey, BitmapKey, BlobKey, IndexKey, LogKey, Serialize, ValueKey,
SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS, SUBSPACE_INDEXES,
SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES,
};
pub mod bitmap;
pub mod log;
pub mod blob;
pub mod main;
pub mod purge;
pub mod read;
pub mod write;
pub const CF_BITMAPS: &str = "b";
pub const CF_VALUES: &str = "v";
pub const CF_LOGS: &str = "l";
pub const CF_BLOBS: &str = "o";
pub const CF_INDEXES: &str = "i";
pub const COLLECTION_PREFIX_LEN: usize = U32_LEN + std::mem::size_of::<u8>();
pub const FIELD_PREFIX_LEN: usize = COLLECTION_PREFIX_LEN + std::mem::size_of::<u8>();
pub const ACCOUNT_KEY_LEN: usize = U32_LEN + std::mem::size_of::<u8>() + U32_LEN;
impl<T: AsRef<[u8]>> Serialize for IndexKey<T> {
fn serialize(self) -> Vec<u8> {
let key = self.key.as_ref();
KeySerializer::new(std::mem::size_of::<IndexKey<T>>() + key.len())
.write(self.account_id)
.write(self.collection)
.write(self.field)
.write(key)
.write(self.document_id)
.finalize()
}
}
impl Serialize for ValueKey {
fn serialize(self) -> Vec<u8> {
if self.family == 0 {
KeySerializer::new(std::mem::size_of::<ValueKey>())
.write_leb128(self.account_id)
.write(self.collection)
.write_leb128(self.document_id)
.write(self.field)
.finalize()
} else {
KeySerializer::new(std::mem::size_of::<ValueKey>() + 1)
.write_leb128(self.account_id)
.write(self.collection)
.write_leb128(self.document_id)
.write(u8::MAX)
.write(self.family)
.write(self.field)
.finalize()
}
}
}
impl<T: AsRef<[u8]>> Serialize for BitmapKey<T> {
fn serialize(self) -> Vec<u8> {
let key = self.key.as_ref();
KeySerializer::new(std::mem::size_of::<BitmapKey<T>>() + key.len())
.write_leb128(self.account_id)
.write(self.collection)
.write(self.family)
.write(self.field)
.write(key)
.finalize()
}
}
impl<T: AsRef<[u8]>> Serialize for BlobKey<T> {
fn serialize(self) -> Vec<u8> {
let hash = self.hash.as_ref();
KeySerializer::new(std::mem::size_of::<BlobKey<T>>() + hash.len())
.write(hash)
.write_leb128(self.account_id)
.write(self.collection)
.write_leb128(self.document_id)
.finalize()
}
}
impl Serialize for AclKey {
fn serialize(self) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<AclKey>())
.write_leb128(self.grant_account_id)
.write(u8::MAX)
.write_leb128(self.to_account_id)
.write(self.to_collection)
.write_leb128(self.to_document_id)
.finalize()
}
}
impl Serialize for LogKey {
fn serialize(self) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<LogKey>())
.write(self.account_id)
.write(self.collection)
.write(self.change_id)
.finalize()
}
}
impl BloomHash {
pub fn to_high_rank_key(&self, account_id: u32, collection: u8, field: u8) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<BitmapKey<&[u8]>>() + 2)
.write_leb128(account_id)
.write(collection)
.write(BM_BLOOM)
.write(field)
.write(self.as_high_rank_hash())
.finalize()
}
}
static CF_BITMAPS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_BITMAPS]) };
static CF_VALUES: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_VALUES]) };
static CF_LOGS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_LOGS]) };
static CF_INDEXES: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_INDEXES]) };
static CF_BLOBS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_BLOBS]) };
static CF_BLOB_DATA: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_BLOB_DATA]) };
static CF_INDEX_VALUES: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_INDEX_VALUES]) };
static CF_COUNTERS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_COUNTERS]) };
impl From<rocksdb::Error> for crate::Error {
fn from(value: rocksdb::Error) -> Self {
@ -141,7 +52,7 @@ impl From<rocksdb::Error> for crate::Error {
}
}
#[cfg(feature = "rocks")]
pub struct Store {
db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded>,
pub struct RocksDbStore {
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
worker_pool: rayon::ThreadPool,
}

View file

@ -0,0 +1,63 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use rocksdb::{Direction, IteratorMode};
use crate::{write::key::KeySerializer, U32_LEN};
use super::{RocksDbStore, CF_BITMAPS, CF_INDEXES, CF_LOGS, CF_VALUES};
impl RocksDbStore {
pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
Ok(())
}
pub(crate) async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
let db = self.db.clone();
self.spawn_worker(move || {
let key = KeySerializer::new(U32_LEN).write(account_id).finalize();
// TODO use delete_range when implemented (see https://github.com/rust-rocksdb/rust-rocksdb/issues/839)
for cf_name in [CF_BITMAPS, CF_VALUES, CF_LOGS, CF_INDEXES] {
let mut delete_keys = Vec::new();
let it_mode = IteratorMode::From(&key, Direction::Forward);
let cf = db.cf_handle(cf_name).unwrap();
for row in db.iterator_cf(&cf, it_mode) {
let (k, _) = row?;
if !k.starts_with(&key) {
break;
}
delete_keys.push(k);
}
for k in delete_keys {
db.delete_cf(&cf, &k)?;
}
}
Ok(())
})
.await
}
}

View file

@ -21,210 +21,343 @@
* for more details.
*/
use std::ops::{BitAndAssign, BitOrAssign};
use roaring::RoaringBitmap;
use rocksdb::{Direction, IteratorMode};
use crate::{
query::Operator, write::key::DeserializeBigEndian, BitmapKey, Deserialize, Error, Serialize,
Store, BM_DOCUMENT_IDS,
query::{self, Operator},
write::{
key::{DeserializeBigEndian, KeySerializer},
BitmapClass, ValueClass,
},
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, IterateParams, Key, ValueKey, U32_LEN,
};
use super::{CF_BITMAPS, CF_INDEXES, CF_VALUES, FIELD_PREFIX_LEN};
use super::{RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_INDEXES, CF_VALUES};
impl Store {
#[inline(always)]
pub fn get_value<U>(&self, key: impl Serialize) -> crate::Result<Option<U>>
const INDEX_PREFIX_LEN: usize = U32_LEN + 2;
impl RocksDbStore {
pub(crate) async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>>
where
U: Deserialize,
U: Deserialize + 'static,
{
let key = key.serialize();
if let Some(bytes) = self
.db
.get_pinned_cf(&self.db.cf_handle(CF_VALUES).unwrap(), &key)
.map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))?
{
Ok(Some(U::deserialize(&bytes).ok_or_else(|| {
Error::InternalError(format!("Failed to deserialize key: {:?}", key))
})?))
} else {
Ok(None)
}
}
#[inline(always)]
pub fn get_values<U>(&self, keys: Vec<impl Serialize>) -> crate::Result<Vec<Option<U>>>
where
U: Deserialize,
{
let cf_handle = self.db.cf_handle(CF_VALUES).unwrap();
let mut results = Vec::with_capacity(keys.len());
for value in self.db.multi_get_cf(
keys.into_iter()
.map(|key| (&cf_handle, key.serialize()))
.collect::<Vec<_>>(),
) {
results.push(
if let Some(bytes) = value
.map_err(|err| Error::InternalError(format!("multi_get_cf failed: {}", err)))?
{
U::deserialize(&bytes)
.ok_or_else(|| {
Error::InternalError("Failed to deserialize keys.".to_string())
})?
.into()
} else {
None
},
);
}
Ok(results)
}
#[inline(always)]
pub fn get_bitmap<T: AsRef<[u8]>>(
&self,
key: BitmapKey,
) -> crate::Result<Option<RoaringBitmap>> {
let key = key.serialize();
if let Some(bytes) = self
.db
.get_pinned_cf(&self.db.cf_handle(CF_BITMAPS).unwrap(), &key)
.map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))?
{
let bm = RoaringBitmap::deserialize(&bytes).ok_or_else(|| {
Error::InternalError(format!("Failed to deserialize key: {:?}", &key))
})?;
Ok(if !bm.is_empty() { Some(bm) } else { None })
} else {
Ok(None)
}
}
#[inline(always)]
fn get_bitmaps<T: Serialize>(&self, keys: Vec<T>) -> crate::Result<Vec<Option<RoaringBitmap>>> {
let cf_handle = self.db.cf_handle(CF_BITMAPS).unwrap();
let mut results = Vec::with_capacity(keys.len());
for value in self.db.multi_get_cf(
keys.into_iter()
.map(|key| (&cf_handle, key.serialize()))
.collect::<Vec<_>>(),
) {
results.push(
if let Some(bytes) = value
.map_err(|err| Error::InternalError(format!("multi_get_cf failed: {}", err)))?
{
RoaringBitmap::deserialize(&bytes)
.ok_or_else(|| {
Error::InternalError("Failed to deserialize keys.".to_string())
})?
.into()
} else {
None
},
);
}
Ok(results)
}
pub(crate) fn get_bitmaps_intersection<T: Serialize>(
&self,
keys: Vec<T>,
) -> crate::Result<Option<RoaringBitmap>> {
let mut result: Option<RoaringBitmap> = None;
for bitmap in self.get_bitmaps(keys)? {
if let Some(bitmap) = bitmap {
if let Some(result) = &mut result {
result.bitand_assign(&bitmap);
if result.is_empty() {
break;
let db = self.db.clone();
self.spawn_worker(move || {
db.get_pinned_cf(&db.cf_handle(CF_VALUES).unwrap(), &key.serialize(false))
.map_err(Into::into)
.and_then(|value| {
if let Some(value) = value {
U::deserialize(&value).map(Some)
} else {
Ok(None)
}
} else {
result = Some(bitmap);
}
} else {
return Ok(None);
}
}
Ok(result)
})
})
.await
}
pub(crate) fn get_bitmaps_union<T: Serialize>(
pub(crate) async fn get_bitmap(
&self,
keys: Vec<T>,
key: BitmapKey<BitmapClass>,
) -> crate::Result<Option<RoaringBitmap>> {
let mut result: Option<RoaringBitmap> = None;
for bitmap in (self.get_bitmaps(keys)?).into_iter().flatten() {
if let Some(result) = &mut result {
result.bitor_assign(&bitmap);
} else {
result = Some(bitmap);
}
}
Ok(result)
let db = self.db.clone();
self.spawn_worker(move || {
db.get_pinned_cf(&db.cf_handle(CF_BITMAPS).unwrap(), &key.serialize(false))
.map_err(Into::into)
.and_then(|value| {
if let Some(value) = value {
RoaringBitmap::deserialize(&value).map(|rb| {
if !rb.is_empty() {
Some(rb)
} else {
None
}
})
} else {
Ok(None)
}
})
})
.await
}
pub(crate) fn range_to_bitmap(
pub(crate) async fn range_to_bitmap(
&self,
match_key: &[u8],
account_id: u32,
collection: u8,
field: u8,
match_value: &[u8],
op: Operator,
op: query::Operator,
) -> crate::Result<Option<RoaringBitmap>> {
let mut bm = RoaringBitmap::new();
let match_prefix = &match_key[0..FIELD_PREFIX_LEN];
for result in self.db.iterator_cf(
&self.db.cf_handle(CF_INDEXES).unwrap(),
IteratorMode::From(
match_key,
let db = self.db.clone();
self.spawn_worker(move || {
let prefix =
KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + match_value.len() + 1)
.write(account_id)
.write(collection)
.write(field)
.write(match_value)
.finalize();
let match_prefix = &prefix[0..INDEX_PREFIX_LEN];
let mut bm = RoaringBitmap::new();
let it_mode = IteratorMode::From(
&prefix,
match op {
Operator::GreaterThan | Operator::GreaterEqualThan | Operator::Equal => {
Direction::Forward
}
_ => Direction::Reverse,
},
),
) {
let (key, _) = result
.map_err(|err| Error::InternalError(format!("iterator_cf failed: {}", err)))?;
if !key.starts_with(match_prefix) {
break;
}
let doc_id_pos = key.len() - U32_LEN;
let value = key.get(FIELD_PREFIX_LEN..doc_id_pos).ok_or_else(|| {
Error::InternalError("Invalid key found in 'indexes' column family.".to_string())
})?;
);
match op {
Operator::LowerThan if value >= match_value => {
if value == match_value {
continue;
} else {
break;
}
for row in db.iterator_cf(&self.db.cf_handle(CF_INDEXES).unwrap(), it_mode) {
let (key, _) = row?;
if !key.starts_with(match_prefix) {
break;
}
Operator::LowerEqualThan if value > match_value => break,
Operator::GreaterThan if value <= match_value => {
if value == match_value {
continue;
} else {
break;
}
}
Operator::GreaterEqualThan if value < match_value => break,
Operator::Equal if value != match_value => break,
_ => {
bm.insert(key.as_ref().deserialize_be_u32(doc_id_pos).ok_or_else(|| {
Error::InternalError(
let value = key
.get(INDEX_PREFIX_LEN..key.len() - U32_LEN)
.ok_or_else(|| {
crate::Error::InternalError(
"Invalid key found in 'indexes' column family.".to_string(),
)
})?);
})?;
match op {
Operator::LowerThan if value >= match_value => {
if value == match_value {
continue;
} else {
break;
}
}
Operator::LowerEqualThan if value > match_value => break,
Operator::GreaterThan if value <= match_value => {
if value == match_value {
continue;
} else {
break;
}
}
Operator::GreaterEqualThan if value < match_value => break,
Operator::Equal if value != match_value => break,
_ => {
bm.insert(key.as_ref().deserialize_be_u32(key.len() - U32_LEN)?);
}
}
}
}
Ok(Some(bm))
Ok(Some(bm))
})
.await
}
pub(crate) async fn sort_index(
&self,
account_id: u32,
collection: impl Into<u8> + Sync + Send,
field: impl Into<u8> + Sync + Send,
ascending: bool,
mut cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send,
) -> crate::Result<()> {
let collection = collection.into();
let field = field.into();
let db = self.db.clone();
self.spawn_worker(move || {
let prefix = IndexKeyPrefix {
account_id,
collection,
field,
}
.serialize(false);
let prefix_rev;
let it_mode = if ascending {
IteratorMode::From(&prefix, Direction::Forward)
} else {
prefix_rev = IndexKeyPrefix {
account_id,
collection,
field: field + 1,
}
.serialize(false);
IteratorMode::From(&prefix_rev, Direction::Reverse)
};
for row in db.iterator_cf(&self.db.cf_handle(CF_INDEXES).unwrap(), it_mode) {
let (key, _) = row?;
if !key.starts_with(&prefix) {
break;
}
let id_pos = key.len() - U32_LEN;
if !cb(
key.get(INDEX_PREFIX_LEN..id_pos).ok_or_else(|| {
crate::Error::InternalError("Invalid key found in index".to_string())
})?,
key.as_ref().deserialize_be_u32(id_pos)?,
)? {
return Ok(());
}
}
Ok(())
})
.await
}
pub(crate) async fn iterate<T: Key>(
&self,
params: IterateParams<T>,
mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> crate::Result<bool> + Sync + Send,
) -> crate::Result<()> {
let db = self.db.clone();
self.spawn_worker(move || {
let cf = db
.cf_handle(std::str::from_utf8(&[params.begin.subspace()]).unwrap())
.unwrap();
let begin = params.begin.serialize(false);
let end = params.end.serialize(false);
let it_mode = if params.ascending {
IteratorMode::From(&begin, Direction::Forward)
} else {
IteratorMode::From(&end, Direction::Reverse)
};
for row in db.iterator_cf(&cf, it_mode) {
let (key, value) = row?;
if key.as_ref() < begin.as_slice()
|| key.as_ref() > end.as_slice()
|| !cb(&key, &value)?
|| params.first
{
break;
}
}
Ok(())
})
.await
}
pub(crate) async fn get_counter(
&self,
key: impl Into<ValueKey<ValueClass>> + Sync + Send,
) -> crate::Result<i64> {
let key = key.into().serialize(false);
let db = self.db.clone();
self.spawn_worker(move || {
db.get_pinned_cf(&db.cf_handle(CF_COUNTERS).unwrap(), &key)
.map_err(Into::into)
.and_then(|bytes| {
Ok(if let Some(bytes) = bytes {
i64::from_le_bytes(bytes[..].try_into().map_err(|_| {
crate::Error::InternalError("Invalid counter value.".to_string())
})?)
} else {
0
})
})
})
.await
}
#[cfg(feature = "test_mode")]
pub(crate) async fn assert_is_empty(&self) {
use super::CF_LOGS;
let db = self.db.clone();
self.spawn_worker(move || {
let mut delete_keys = Vec::new();
for cf_name in [
super::CF_BITMAPS,
super::CF_VALUES,
super::CF_INDEX_VALUES,
super::CF_COUNTERS,
super::CF_BLOB_DATA,
super::CF_INDEXES,
super::CF_BLOBS,
super::CF_LOGS,
] {
let cf = db.cf_handle(cf_name).unwrap();
for row in db.iterator_cf(&cf, IteratorMode::Start) {
let (key_, value_) = row.unwrap();
let (key, value) = (key_.as_ref(), value_.as_ref());
if cf_name == super::CF_BITMAPS {
if key[0..4] != u32::MAX.to_be_bytes() {
let bm = RoaringBitmap::deserialize(value).unwrap();
if !bm.is_empty() {
panic!(
concat!(
"Table bitmaps is not empty, account {}, ",
"collection {}, family {}, field {}, key {:?}: {:?}"
),
u32::from_be_bytes(key[0..4].try_into().unwrap()),
key[4],
key[5],
key[6],
key,
bm
);
}
}
} else if cf_name == super::CF_VALUES {
// Ignore lastId counter and ID mappings
if key[0..4] == u32::MAX.to_be_bytes() {
continue;
}
panic!("Table values is not empty: {key:?} {value:?}");
} else if cf_name == super::CF_COUNTERS {
let value = i64::from_le_bytes(value[..].try_into().unwrap());
if value != 0 {
panic!(
"Table counter is not empty, account {:?}, quota: {}",
key, value,
);
}
} else if cf_name == super::CF_INDEX_VALUES
|| cf_name == super::CF_BLOB_DATA
|| cf_name == super::CF_BLOBS
{
panic!("Subspace {cf_name:?} is not empty: {key:?} {value:?}",);
} else if cf_name == super::CF_INDEXES {
panic!(
concat!(
"Table index is not empty, account {}, collection {}, ",
"document {}, property {}, value {:?}: {:?}"
),
u32::from_be_bytes(key[0..4].try_into().unwrap()),
key[4],
u32::from_be_bytes(key[key.len() - 4..].try_into().unwrap()),
key[5],
String::from_utf8_lossy(&key[6..key.len() - 4]),
key
);
} else if cf_name == super::CF_LOGS {
delete_keys.push(key.to_vec());
} else {
panic!("Unknown column family: {}", cf_name);
}
}
}
// Delete logs
let cf = db.cf_handle(CF_LOGS).unwrap();
for key in delete_keys {
db.delete_cf(&cf, &key).unwrap();
}
Ok(())
})
.await
.unwrap();
}
}

View file

@ -21,235 +21,237 @@
* for more details.
*/
use std::time::Instant;
use roaring::RoaringBitmap;
use rocksdb::ErrorKind;
use utils::map::vec_map::VecMap;
use crate::{
write::{AccountCollection, Batch, Operation, WriteResult},
AclKey, BitmapKey, BlobKey, Deserialize, Error, IndexKey, LogKey, Serialize, Store, ValueKey,
BM_DOCUMENT_IDS, UNASSIGNED_ID,
use std::{
thread::sleep,
time::{Duration, Instant},
};
use rand::Rng;
use rocksdb::ErrorKind;
use super::{
bitmap::{clear_bit, set_bit},
CF_BITMAPS, CF_BLOBS, CF_INDEXES, CF_LOGS, CF_VALUES,
RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_BLOB_DATA, CF_COUNTERS, CF_INDEXES, CF_INDEX_VALUES,
CF_LOGS, CF_VALUES,
};
use crate::{
write::{Batch, Operation, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME},
BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_INDEX_VALUES, SUBSPACE_VALUES,
};
impl Store {
pub fn write(&self, batch: Batch) -> crate::Result<WriteResult> {
let cf_values = self.db.cf_handle(CF_VALUES).unwrap();
let cf_bitmaps = self.db.cf_handle(CF_BITMAPS).unwrap();
let cf_indexes = self.db.cf_handle(CF_INDEXES).unwrap();
let cf_logs = self.db.cf_handle(CF_LOGS).unwrap();
let cf_blobs = self.db.cf_handle(CF_BLOBS).unwrap();
let start = Instant::now();
impl RocksDbStore {
pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> {
let db = self.db.clone();
loop {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let mut result = WriteResult {
change_ids: VecMap::new(),
assigned_ids: VecMap::new(),
};
let txn = self.db.transaction();
let mut wb = txn.get_writebatch();
self.spawn_worker(move || {
let start = Instant::now();
let mut retry_count = 0;
for op in &batch.ops {
match op {
Operation::AccountId {
account_id: account_id_,
} => {
account_id = *account_id_;
}
Operation::Collection {
collection: collection_,
} => {
collection = *collection_;
}
Operation::DocumentId {
document_id: document_id_,
set,
} => {
if *document_id_ == UNASSIGNED_ID {
let cf_bitmaps = db.cf_handle(CF_BITMAPS).unwrap();
let cf_values = db.cf_handle(CF_VALUES).unwrap();
let cf_indexes = db.cf_handle(CF_INDEXES).unwrap();
let cf_logs = db.cf_handle(CF_LOGS).unwrap();
let cf_blobs = db.cf_handle(CF_BLOBS).unwrap();
let cf_index_values = db.cf_handle(CF_INDEX_VALUES).unwrap();
let cf_counters = db.cf_handle(CF_COUNTERS).unwrap();
loop {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let txn = self.db.transaction();
let mut wb = txn.get_writebatch();
for op in &batch.ops {
match op {
Operation::AccountId {
account_id: account_id_,
} => {
account_id = *account_id_;
}
Operation::Collection {
collection: collection_,
} => {
collection = *collection_;
}
Operation::DocumentId {
document_id: document_id_,
} => {
document_id = *document_id_;
}
Operation::Value {
class,
op: ValueOp::Add(by),
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(false);
wb.merge_cf(&cf_counters, &key, &by.to_le_bytes()[..]);
}
Operation::Value { class, op } => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
};
let cf = match key.subspace() {
SUBSPACE_VALUES => &cf_values,
SUBSPACE_INDEX_VALUES => &cf_index_values,
_ => unreachable!(),
};
let key = key.serialize(false);
if let ValueOp::Set(value) = op {
wb.put_cf(cf, &key, value);
} else {
wb.delete_cf(cf, &key);
}
}
Operation::Index { field, key, set } => {
let key = IndexKey {
account_id,
collection,
document_id,
field: *field,
key,
}
.serialize(false);
if *set {
wb.put_cf(&cf_indexes, &key, []);
} else {
wb.delete_cf(&cf_indexes, &key);
}
}
Operation::Bitmap { class, set } => {
let key = BitmapKey {
account_id,
collection,
family: BM_DOCUMENT_IDS,
field: u8::MAX,
key: b"",
class,
block_num: 0,
}
.serialize();
let mut document_ids = if let Some(bytes) = txn
.get_pinned_for_update_cf(&cf_bitmaps, &key, true)
.map_err(|err| {
Error::InternalError(format!("get_cf failed: {}", err))
})? {
RoaringBitmap::deserialize(&bytes).ok_or_else(|| {
Error::InternalError(format!(
"Failed to deserialize key: {:?}",
key
))
})?
.serialize(false);
let value = if *set {
set_bit(document_id)
} else {
RoaringBitmap::new()
clear_bit(document_id)
};
document_id = if let Some(max_id) = document_ids.max() {
let mask = if max_id < 20000 {
RoaringBitmap::from_sorted_iter(0..max_id + 2).unwrap()
} else {
RoaringBitmap::full()
};
document_ids ^= mask;
document_ids.min().unwrap()
wb.merge_cf(&cf_bitmaps, key, value);
}
Operation::Blob { hash, op, set } => {
let key = BlobKey {
account_id,
collection,
document_id,
hash,
op: *op,
}
.serialize(false);
if *set {
wb.put_cf(&cf_blobs, &key, []);
} else {
0
};
result
.assigned_ids
.append((account_id, collection).into(), document_id);
wb.merge_cf(&cf_bitmaps, key, set_bit(document_id));
} else {
document_id = *document_id_;
if !*set {
wb.merge_cf(
&cf_bitmaps,
BitmapKey {
account_id,
collection,
family: BM_DOCUMENT_IDS,
field: u8::MAX,
key: b"",
}
.serialize(),
clear_bit(document_id),
);
wb.delete_cf(&cf_blobs, &key);
}
}
}
Operation::Value { family, field, set } => {
let key = ValueKey {
account_id,
Operation::Log {
collection,
document_id,
family: *family,
field: *field,
}
.serialize();
if let Some(value) = set {
wb.put_cf(&cf_values, key, value);
} else {
wb.delete_cf(&cf_values, key);
}
}
Operation::Index { field, key, set } => {
let key_ = IndexKey {
account_id,
collection,
document_id,
field: *field,
key,
}
.serialize();
if *set {
wb.put_cf(&cf_indexes, key_, []);
} else {
wb.delete_cf(&cf_indexes, key_);
}
}
Operation::Bitmap {
family,
field,
key,
set,
} => {
let key = BitmapKey {
account_id,
collection,
family: *family,
field: *field,
key,
}
.serialize();
let value = if *set {
set_bit(document_id)
} else {
clear_bit(document_id)
};
wb.merge_cf(&cf_bitmaps, key, value);
}
Operation::Blob { key, set } => {
let key = BlobKey {
account_id,
collection,
document_id,
hash: key,
}
.serialize();
if *set {
wb.put_cf(&cf_blobs, key, []);
} else {
wb.delete_cf(&cf_blobs, key);
}
}
Operation::Acl {
grant_account_id,
set,
} => {
let key = AclKey {
grant_account_id: *grant_account_id,
to_account_id: account_id,
to_collection: collection,
to_document_id: document_id,
}
.serialize();
if let Some(value) = set {
wb.put_cf(&cf_values, key, value);
} else {
wb.delete_cf(&cf_values, key);
}
}
Operation::Log {
collection,
changes,
} => {
let ac: AccountCollection = (account_id, *collection).into();
let coco = "read for write";
let change_id = self
.get_last_change_id(account_id, *collection)?
.map(|id| id + 1)
.unwrap_or(0);
let key = LogKey {
account_id,
collection: *collection,
change_id,
set,
} => {
let key = LogKey {
account_id,
collection: *collection,
change_id: *change_id,
}
.serialize(false);
wb.put_cf(&cf_logs, &key, set);
}
Operation::AssertValue {
class,
assert_value,
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
};
let cf = match key.subspace() {
SUBSPACE_VALUES => &cf_values,
SUBSPACE_INDEX_VALUES => &cf_index_values,
_ => unreachable!(),
};
let key = key.serialize(false);
let matches = txn
.get_cf(cf, &key)?
.map(|value| assert_value.matches(&value))
.unwrap_or_else(|| assert_value.is_none());
if !matches {
return Err(crate::Error::AssertValueFailed);
}
}
.serialize();
wb.put_cf(
&cf_logs,
key,
changes.serialize(
result.assigned_ids.get(&ac).copied().unwrap_or_default(),
),
);
result.change_ids.append(ac, change_id);
}
}
match db.write(wb) {
Ok(_) => {
return Ok(());
}
Err(err) => match err.kind() {
ErrorKind::Busy | ErrorKind::MergeInProgress | ErrorKind::TryAgain
if retry_count < MAX_COMMIT_ATTEMPTS
&& start.elapsed() < MAX_COMMIT_TIME =>
{
let backoff = rand::thread_rng().gen_range(50..=300);
sleep(Duration::from_millis(backoff));
retry_count += 1;
}
_ => return Err(err.into()),
},
}
}
})
.await
}
match self.db.write(wb) {
Ok(_) => {
//println!("Success with id {}", document_id);
return Ok(result);
}
Err(err) => match err.kind() {
ErrorKind::Busy | ErrorKind::MergeInProgress | ErrorKind::TryAgain
if start.elapsed().as_secs() < 5 => {}
_ => return Err(err.into()),
},
#[cfg(feature = "test_mode")]
pub(crate) async fn destroy(&self) {
use rocksdb::IteratorMode;
self.db.cancel_all_background_work(false);
for cf_name in [
CF_VALUES,
CF_LOGS,
CF_BITMAPS,
CF_INDEXES,
CF_BLOBS,
CF_INDEX_VALUES,
CF_COUNTERS,
CF_BLOB_DATA,
] {
let mut delete_keys = Vec::new();
let it_mode = IteratorMode::Start;
let cf = self.db.cf_handle(cf_name).unwrap();
for row in self.db.iterator_cf(&cf, it_mode) {
let (k, _) = row.unwrap();
delete_keys.push(k);
}
for k in delete_keys {
self.db.delete_cf(&cf, &k).unwrap();
}
}
}

View file

@ -41,10 +41,16 @@ impl Store {
U: Deserialize + 'static,
{
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.get_value(key).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.get_value(key).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.get_value(key).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.get_value(key).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.get_value(key).await,
}
}
@ -66,10 +72,16 @@ impl Store {
key: BitmapKey<BitmapClass>,
) -> crate::Result<Option<RoaringBitmap>> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.get_bitmap(key).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.get_bitmap(key).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.get_bitmap(key).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.get_bitmap(key).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.get_bitmap(key).await,
}
}
@ -104,26 +116,36 @@ impl Store {
op: query::Operator,
) -> crate::Result<Option<RoaringBitmap>> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => {
store
.range_to_bitmap(account_id, collection, field, value, op)
.await
}
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => {
store
.range_to_bitmap(account_id, collection, field, value, op)
.await
}
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => {
store
.range_to_bitmap(account_id, collection, field, value, op)
.await
}
#[cfg(feature = "mysql")]
Self::MySQL(store) => {
store
.range_to_bitmap(account_id, collection, field, value, op)
.await
}
#[cfg(feature = "rocks")]
Self::RocksDb(store) => {
store
.range_to_bitmap(account_id, collection, field, value, op)
.await
}
}
}
@ -136,26 +158,36 @@ impl Store {
cb: impl for<'x> FnMut(&'x [u8], u32) -> crate::Result<bool> + Sync + Send,
) -> crate::Result<()> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => {
store
.sort_index(account_id, collection, field, ascending, cb)
.await
}
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => {
store
.sort_index(account_id, collection, field, ascending, cb)
.await
}
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => {
store
.sort_index(account_id, collection, field, ascending, cb)
.await
}
#[cfg(feature = "mysql")]
Self::MySQL(store) => {
store
.sort_index(account_id, collection, field, ascending, cb)
.await
}
#[cfg(feature = "rocks")]
Self::RocksDb(store) => {
store
.sort_index(account_id, collection, field, ascending, cb)
.await
}
}
}
@ -165,10 +197,16 @@ impl Store {
cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> crate::Result<bool> + Sync + Send,
) -> crate::Result<()> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.iterate(params, cb).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.iterate(params, cb).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.iterate(params, cb).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.iterate(params, cb).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.iterate(params, cb).await,
}
}
@ -177,73 +215,121 @@ impl Store {
key: impl Into<ValueKey<ValueClass>> + Sync + Send,
) -> crate::Result<i64> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.get_counter(key).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.get_counter(key).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.get_counter(key).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.get_counter(key).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.get_counter(key).await,
}
}
pub async fn write(&self, batch: Batch) -> crate::Result<()> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.write(batch).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.write(batch).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.write(batch).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.write(batch).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.write(batch).await,
}
}
pub async fn purge_bitmaps(&self) -> crate::Result<()> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.purge_bitmaps().await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.purge_bitmaps().await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.purge_bitmaps().await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.purge_bitmaps().await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.purge_bitmaps().await,
}
}
pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.purge_account(account_id).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.purge_account(account_id).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.purge_account(account_id).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.purge_account(account_id).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.purge_account(account_id).await,
}
}
pub async fn get_blob(&self, key: &[u8], range: Range<u32>) -> crate::Result<Option<Vec<u8>>> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.get_blob(key, range).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.get_blob(key, range).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.get_blob(key, range).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.get_blob(key, range).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.get_blob(key, range).await,
}
}
pub async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.put_blob(key, data).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.put_blob(key, data).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.put_blob(key, data).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.put_blob(key, data).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.put_blob(key, data).await,
}
}
pub async fn delete_blob(&self, key: &[u8]) -> crate::Result<bool> {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.delete_blob(key).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.delete_blob(key).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.delete_blob(key).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.delete_blob(key).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.delete_blob(key).await,
}
}
#[cfg(feature = "test_mode")]
pub async fn destroy(&self) {
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.destroy().await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.destroy().await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.destroy().await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.destroy().await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.destroy().await,
}
}
@ -307,10 +393,16 @@ impl Store {
self.purge_bitmaps().await.unwrap();
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.assert_is_empty().await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.assert_is_empty().await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.assert_is_empty().await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.assert_is_empty().await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.assert_is_empty().await,
}
}
}
@ -320,10 +412,16 @@ impl BlobStore {
match self {
Self::Fs(store) => store.get_blob(key, range).await,
Self::S3(store) => store.get_blob(key, range).await,
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.get_blob(key, range).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.get_blob(key, range).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.get_blob(key, range).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.get_blob(key, range).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.get_blob(key, range).await,
}
}
@ -331,10 +429,16 @@ impl BlobStore {
match self {
Self::Fs(store) => store.put_blob(key, data).await,
Self::S3(store) => store.put_blob(key, data).await,
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.put_blob(key, data).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.put_blob(key, data).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.put_blob(key, data).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.put_blob(key, data).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.put_blob(key, data).await,
}
}
@ -342,10 +446,16 @@ impl BlobStore {
match self {
Self::Fs(store) => store.delete_blob(key).await,
Self::S3(store) => store.delete_blob(key).await,
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.delete_blob(key).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.delete_blob(key).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.delete_blob(key).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.delete_blob(key).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.delete_blob(key).await,
}
}
}

View file

@ -30,16 +30,28 @@ pub mod query;
pub mod write;
pub use ahash;
use backend::{
foundationdb::FdbStore, fs::FsStore, mysql::MysqlStore, postgres::PostgresStore, s3::S3Store,
sqlite::SqliteStore,
};
use backend::{fs::FsStore, s3::S3Store};
pub use blake3;
pub use parking_lot;
pub use rand;
pub use roaring;
use write::{BitmapClass, BlobOp, ValueClass};
#[cfg(feature = "postgres")]
use backend::postgres::PostgresStore;
#[cfg(feature = "mysql")]
use backend::mysql::MysqlStore;
#[cfg(feature = "sqlite")]
use backend::sqlite::SqliteStore;
#[cfg(feature = "foundation")]
use backend::foundationdb::FdbStore;
#[cfg(feature = "rocks")]
use backend::rocksdb::RocksDbStore;
pub trait Deserialize: Sized + Sync + Send {
fn deserialize(bytes: &[u8]) -> crate::Result<Self>;
}
@ -170,20 +182,32 @@ pub struct IterateParams<T: Key> {
#[derive(Clone)]
pub enum Store {
#[cfg(feature = "sqlite")]
SQLite(Arc<SqliteStore>),
#[cfg(feature = "foundation")]
FoundationDb(Arc<FdbStore>),
#[cfg(feature = "postgres")]
PostgreSQL(Arc<PostgresStore>),
#[cfg(feature = "mysql")]
MySQL(Arc<MysqlStore>),
#[cfg(feature = "rocks")]
RocksDb(Arc<RocksDbStore>),
}
#[derive(Clone)]
pub enum BlobStore {
Fs(Arc<FsStore>),
S3(Arc<S3Store>),
#[cfg(feature = "sqlite")]
Sqlite(Arc<SqliteStore>),
#[cfg(feature = "foundation")]
FoundationDb(Arc<FdbStore>),
#[cfg(feature = "postgres")]
PostgreSQL(Arc<PostgresStore>),
#[cfg(feature = "mysql")]
MySQL(Arc<MysqlStore>),
#[cfg(feature = "rocks")]
RocksDb(Arc<RocksDbStore>),
}
#[derive(Clone)]
@ -191,30 +215,41 @@ pub enum FtsStore {
Store(Store),
}
#[cfg(feature = "sqlite")]
impl From<SqliteStore> for Store {
fn from(store: SqliteStore) -> Self {
Self::SQLite(Arc::new(store))
}
}
#[cfg(feature = "foundation")]
impl From<FdbStore> for Store {
fn from(store: FdbStore) -> Self {
Self::FoundationDb(Arc::new(store))
}
}
#[cfg(feature = "postgres")]
impl From<PostgresStore> for Store {
fn from(store: PostgresStore) -> Self {
Self::PostgreSQL(Arc::new(store))
}
}
#[cfg(feature = "mysql")]
impl From<MysqlStore> for Store {
fn from(store: MysqlStore) -> Self {
Self::MySQL(Arc::new(store))
}
}
#[cfg(feature = "rocks")]
impl From<RocksDbStore> for Store {
fn from(store: RocksDbStore) -> Self {
Self::RocksDb(Arc::new(store))
}
}
impl From<FsStore> for BlobStore {
fn from(store: FsStore) -> Self {
Self::Fs(Arc::new(store))
@ -236,10 +271,16 @@ impl From<Store> for FtsStore {
impl From<Store> for BlobStore {
fn from(store: Store) -> Self {
match store {
#[cfg(feature = "sqlite")]
Store::SQLite(store) => Self::Sqlite(store),
#[cfg(feature = "foundation")]
Store::FoundationDb(store) => Self::FoundationDb(store),
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => Self::PostgreSQL(store),
#[cfg(feature = "mysql")]
Store::MySQL(store) => Self::MySQL(store),
#[cfg(feature = "rocks")]
Store::RocksDb(store) => Self::RocksDb(store),
}
}
}

View file

@ -26,17 +26,17 @@ use roaring::RoaringBitmap;
use crate::U64_LEN;
pub(crate) const WORD_SIZE_BITS_L: u32 = (WORD_SIZE_L * 8) as u32;
pub(crate) const WORD_SIZE_L: usize = std::mem::size_of::<u128>();
pub(crate) const WORDS_PER_BLOCK_L: u32 = 8;
pub(crate) const BITS_PER_BLOCK_L: u32 = WORD_SIZE_BITS_L * WORDS_PER_BLOCK_L;
pub(crate) const BITS_MASK_L: u32 = BITS_PER_BLOCK_L - 1;
pub const WORD_SIZE_BITS_L: u32 = (WORD_SIZE_L * 8) as u32;
pub const WORD_SIZE_L: usize = std::mem::size_of::<u128>();
pub const WORDS_PER_BLOCK_L: u32 = 8;
pub const BITS_PER_BLOCK_L: u32 = WORD_SIZE_BITS_L * WORDS_PER_BLOCK_L;
pub const BITS_MASK_L: u32 = BITS_PER_BLOCK_L - 1;
pub(crate) const WORD_SIZE_BITS_S: u32 = (WORD_SIZE_S * 8) as u32;
pub(crate) const WORD_SIZE_S: usize = U64_LEN;
pub(crate) const WORDS_PER_BLOCK_S: u32 = 16;
pub(crate) const BITS_PER_BLOCK_S: u32 = WORD_SIZE_BITS_S * WORDS_PER_BLOCK_S;
pub(crate) const BITS_MASK_S: u32 = BITS_PER_BLOCK_S - 1;
pub const WORD_SIZE_BITS_S: u32 = (WORD_SIZE_S * 8) as u32;
pub const WORD_SIZE_S: usize = U64_LEN;
pub const WORDS_PER_BLOCK_S: u32 = 16;
pub const BITS_PER_BLOCK_S: u32 = WORD_SIZE_BITS_S * WORDS_PER_BLOCK_S;
pub const BITS_MASK_S: u32 = BITS_PER_BLOCK_S - 1;
pub struct DenseBitmap {
pub bitmap: [u8; WORD_SIZE_L * WORDS_PER_BLOCK_L as usize],

View file

@ -6,7 +6,7 @@ resolver = "2"
[dependencies]
rustls = { version = "0.21", features = ["tls12", "dangerous_configuration"]}
rustls-pemfile = "1.0"
rustls-pemfile = "2.0"
tokio = { version = "1.23", features = ["net", "macros"] }
tokio-rustls = { version = "0.24.0"}
serde = { version = "1.0", features = ["derive"]}
@ -25,7 +25,7 @@ dashmap = "5.4"
ahash = { version = "0.8" }
chrono = "0.4"
rand = "0.8.5"
webpki-roots = { version = "0.25.2"}
webpki-roots = { version = "0.26"}
[target.'cfg(unix)'.dependencies]
privdrop = "0.5.3"

View file

@ -57,15 +57,16 @@ impl Config {
cert_id,
"cert",
))?))
.collect::<Result<Vec<_>, _>>()
.map_err(|err| {
format!("Failed to read certificates in \"certificate.{cert_id}.cert\": {err}")
})?
.into_iter()
.map(Certificate)
.collect::<Vec<_>>();
})?;
if !certs.is_empty() {
Ok(certs)
Ok(certs
.into_iter()
.map(|cert| Certificate(cert.as_ref().to_vec()))
.collect())
} else {
Err(format!(
"No certificates found in \"certificate.{cert_id}.cert\"."
@ -85,7 +86,9 @@ impl Config {
.into_iter()
.next()
{
Some(Item::PKCS8Key(key) | Item::RSAKey(key) | Item::ECKey(key)) => Ok(PrivateKey(key)),
Some(Item::Pkcs8Key(key)) => Ok(PrivateKey(key.secret_pkcs8_der().to_vec())),
Some(Item::Pkcs1Key(key)) => Ok(PrivateKey(key.secret_pkcs1_der().to_vec())),
Some(Item::Sec1Key(key)) => Ok(PrivateKey(key.secret_sec1_der().to_vec())),
Some(_) => Err(format!(
"Unsupported private keys found in \"certificate.{cert_id}.private-key\".",
)),

View file

@ -239,9 +239,9 @@ pub fn rustls_client_config(allow_invalid_certs: bool) -> ClientConfig {
root_cert_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
OwnedTrustAnchor::from_subject_spki_name_constraints(
ta.subject,
ta.spki,
ta.name_constraints,
ta.subject.as_ref(),
ta.subject_public_key_info.as_ref(),
ta.name_constraints.as_ref().map(|v| v.as_ref()),
)
}));
config

View file

@ -5,9 +5,13 @@ edition = "2021"
resolver = "2"
[features]
default = ["sqlite", "foundationdb"]
#default = ["sqlite", "foundationdb", "postgres", "mysql"]
default = ["rocks"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]
mysql = ["store/mysql"]
rocks = ["store/rocks"]
[dev-dependencies]
store = { path = "../crates/store", features = ["test_mode"] }
@ -29,7 +33,7 @@ mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features =
tokio = { version = "1.23", features = ["full"] }
tokio-rustls = { version = "0.24.0"}
rustls = "0.21.0"
rustls-pemfile = "1.0"
rustls-pemfile = "2.0"
csv = "1.1"
rayon = { version = "1.5.1" }
flate2 = { version = "1.0.17", features = ["zlib"], default-features = false }

View file

@ -278,14 +278,10 @@ pub fn dummy_tls_acceptor() -> Arc<TlsAcceptor> {
// convert files to key/cert objects
let cert_chain = certs(cert_file)
.unwrap()
.into_iter()
.map(Certificate)
.map(|v| Certificate(v.unwrap().as_ref().to_vec()))
.collect();
let mut keys: Vec<PrivateKey> = pkcs8_private_keys(key_file)
.unwrap()
.into_iter()
.map(PrivateKey)
.map(|v| PrivateKey(v.unwrap().secret_pkcs8_der().to_vec()))
.collect();
// exit if no keys could be parsed

View file

@ -137,7 +137,7 @@ future-release = [ { if = "authenticated-as", ne = "", then = "99999999d"},
{ else = false } ]
[store.db]
#path = "PATH/sqlite.db"
path = "{TMP}/sqlite.db"
host = "localhost"
#port = 5432
port = 3307

View file

@ -135,7 +135,7 @@ future-release = [ { if = "authenticated-as", ne = "", then = "99999999d"},
{ else = false } ]
[store.db]
#path = "PATH/sqlite.db"
path = "{TMP}/sqlite.db"
host = "localhost"
#port = 5432
port = 3307

View file

@ -22,9 +22,7 @@
*/
use store::{
backend::{
fs::FsStore, mysql::MysqlStore, postgres::PostgresStore, s3::S3Store, sqlite::SqliteStore,
},
backend::rocksdb::RocksDbStore,
write::{blob::BlobQuota, now, BatchBuilder, BlobOp, F_CLEAR},
BlobClass, BlobHash, BlobStore, Store,
};
@ -48,7 +46,8 @@ path = "{TMP}"
const CONFIG_DB: &str = r#"
[store.db]
#path = "PATH/sqlite.db"
#path = "{TMP}/sqlite.db"
path = "{TMP}/rocksdb"
host = "localhost"
#port = 5432
port = 3307
@ -83,7 +82,8 @@ pub async fn blob_tests() {
//let store: Store = SqliteStore::open(
//let store: Store = FdbStore::open(
//let store: Store = PostgresStore::open(
let store: Store = MysqlStore::open(
//let store: Store = MysqlStore::open(
let store: Store = RocksDbStore::open(
&Config::new(&CONFIG_DB.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap()))
.unwrap(),
)

View file

@ -29,9 +29,7 @@ use std::io::Read;
use ::store::Store;
use store::backend::{
foundationdb::FdbStore, mysql::MysqlStore, postgres::PostgresStore, sqlite::SqliteStore,
};
use store::backend::rocksdb::RocksDbStore;
use utils::config::Config;
pub struct TempDir {
@ -41,10 +39,11 @@ pub struct TempDir {
const CONFIG: &str = r#"
[store.blob]
type = "local"
local.path = "PATH"
local.path = "{TMP}"
[store.db]
#path = "PATH/sqlite.db"
#path = "{TMP}/sqlite.db"
path = "{TMP}/rocksdb"
host = "localhost"
#port = 5432
port = 3307
@ -60,20 +59,23 @@ password = "password"
pub async fn store_tests() {
let insert = true;
let temp_dir = TempDir::new("store_tests", insert);
let config_file = CONFIG.replace("PATH", &temp_dir.path.to_string_lossy());
let config_file = CONFIG.replace("{TMP}", &temp_dir.path.to_string_lossy());
//let db: Store = SqliteStore::open(&Config::new(&config_file).unwrap())
//let db: Store = FdbStore::open(&Config::new(&config_file).unwrap())
//let db: Store = PostgresStore::open(&Config::new(&config_file).unwrap())
let db: Store = MysqlStore::open(&Config::new(&config_file).unwrap())
//let db: Store = MysqlStore::open(&Config::new(&config_file).unwrap())
let db: Store = RocksDbStore::open(&Config::new(&config_file).unwrap())
.await
.unwrap()
.into();
if insert {
db.destroy().await;
}
//query::test(db.clone(), insert).await;
query::test(db.clone(), insert).await;
assign_id::test(db).await;
temp_dir.delete();
if insert {
temp_dir.delete();
}
}
pub fn deflate_artwork_data() -> Vec<u8> {