From b7869901eeddeee824d2ecd85a99d2ccd364857d Mon Sep 17 00:00:00 2001 From: mdecimus Date: Tue, 12 Dec 2023 18:45:52 +0100 Subject: [PATCH] Redis lookup backend implementation --- Cargo.lock | 78 +++++++++++ crates/main/Cargo.toml | 5 +- crates/store/Cargo.toml | 3 + crates/store/src/backend/mod.rs | 2 + crates/store/src/backend/redis/lookup.rs | 90 +++++++++++++ crates/store/src/backend/redis/mod.rs | 159 +++++++++++++++++++++++ crates/store/src/backend/redis/pool.rs | 83 ++++++++++++ crates/store/src/config.rs | 10 ++ crates/store/src/dispatch/lookup.rs | 12 +- crates/store/src/dispatch/store.rs | 2 +- crates/store/src/lib.rs | 12 ++ tests/Cargo.toml | 5 +- tests/src/smtp/inbound/antispam.rs | 4 + tests/src/store/lookup.rs | 121 +++++++++++++++++ tests/src/store/mod.rs | 6 + 15 files changed, 586 insertions(+), 6 deletions(-) create mode 100644 crates/store/src/backend/redis/lookup.rs create mode 100644 crates/store/src/backend/redis/mod.rs create mode 100644 crates/store/src/backend/redis/pool.rs create mode 100644 tests/src/store/lookup.rs diff --git a/Cargo.lock b/Cargo.lock index a69bbbb2..c2eebee9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,6 +938,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "console" version = "0.15.7" @@ -1000,6 +1014,12 @@ dependencies = [ "libc", ] +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crc32fast" version = "1.3.2" @@ -4347,6 +4367,37 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +dependencies = [ + "async-trait", + "bytes", + "combine", + "crc16", + "futures", + "futures-util", + "itoa", + "log", + "percent-encoding", + "pin-project-lite", + "rand", + "rustls 0.21.10", + "rustls-native-certs", + "rustls-pemfile 1.0.4", + "rustls-webpki 0.101.7", + "ryu", + "sha1_smol", + "socket2 0.4.10", + "tokio", + "tokio-rustls 0.24.1", + "tokio-util", + "url", + "webpki-roots 0.23.1", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -4833,6 +4884,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7673e0aa20ee4937c6aacfc12bb8341cfbf054cdd21df6bec5fd0629fe9339b" +[[package]] +name = "rustls-webpki" +version = "0.100.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3" +dependencies = [ + "ring 0.16.20", + "untrusted 0.7.1", +] + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -5168,6 +5229,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sha1collisiondetection" version = "0.3.2" @@ -5467,6 +5534,7 @@ dependencies = [ "async-trait", "blake3", "bytes", + "deadpool", "deadpool-postgres", "elasticsearch", "farmhash", @@ -5483,6 +5551,7 @@ dependencies = [ "r2d2", "rand", "rayon", + "redis", "regex", "reqwest", "ring 0.17.7", @@ -6509,6 +6578,15 @@ dependencies = [ "webpki", ] +[[package]] +name = "webpki-roots" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338" +dependencies = [ + "rustls-webpki 0.100.3", +] + [[package]] name = "webpki-roots" version = "0.25.3" diff --git a/crates/main/Cargo.toml b/crates/main/Cargo.toml index f7e0c6bb..95f17b52 100644 --- a/crates/main/Cargo.toml +++ b/crates/main/Cargo.toml @@ -31,8 +31,8 @@ tracing = "0.1" jemallocator = "0.5.0" [features] -#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"] -default = ["sqlite", "postgres", "mysql", "foundationdb", "rocks"] +#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"] +default = ["sqlite", "postgres", "mysql", "redis"] sqlite = ["store/sqlite"] foundationdb = ["store/foundation"] postgres = ["store/postgres"] @@ -40,3 +40,4 @@ mysql = ["store/mysql"] rocks = ["store/rocks"] elastic = ["store/elastic"] s3 = ["store/s3"] +redis = ["store/redis"] diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 5e414871..7044f629 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -42,6 +42,8 @@ regex = "1.7.0" reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-webpki-roots", "blocking"] } flate2 = "1.0" async-trait = "0.1.68" +redis = { version = "0.24.0", features = [ "tokio-comp", "tokio-rustls-comp", "tls-rustls-insecure", "tls-rustls-webpki-roots", "cluster-async"], optional = true } +deadpool = { version = "0.10.0", features = ["managed"], optional = true } [dev-dependencies] tokio = { version = "1.23", features = ["full"] } @@ -55,6 +57,7 @@ mysql = ["mysql_async"] s3 = ["rust-s3"] foundation = ["foundationdb", "futures"] fdb-chunked-bm = [] +redis = ["dep:redis", "deadpool"] test_mode = [] diff --git a/crates/store/src/backend/mod.rs b/crates/store/src/backend/mod.rs index d7fe5104..dc4aaa3f 100644 --- a/crates/store/src/backend/mod.rs +++ b/crates/store/src/backend/mod.rs @@ -31,6 +31,8 @@ pub mod memory; pub mod mysql; #[cfg(feature = "postgres")] pub mod postgres; +#[cfg(feature = "redis")] +pub mod redis; #[cfg(feature = "rocks")] pub mod rocksdb; #[cfg(feature = "s3")] diff --git a/crates/store/src/backend/redis/lookup.rs b/crates/store/src/backend/redis/lookup.rs new file mode 100644 index 00000000..493ad4e7 --- /dev/null +++ b/crates/store/src/backend/redis/lookup.rs @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use redis::AsyncCommands; + +use crate::{Deserialize, LookupKey, LookupValue}; + +use super::{RedisPool, RedisStore}; + +impl RedisStore { + pub async fn key_set(&self, key: Vec, value: LookupValue>) -> crate::Result<()> { + match &self.pool { + RedisPool::Single(pool) => self.key_set_(pool.get().await?.as_mut(), key, value).await, + RedisPool::Cluster(pool) => self.key_set_(pool.get().await?.as_mut(), key, value).await, + } + } + + pub async fn key_get( + &self, + key: LookupKey, + ) -> crate::Result> { + match &self.pool { + RedisPool::Single(pool) => self.key_get_(pool.get().await?.as_mut(), key).await, + RedisPool::Cluster(pool) => self.key_get_(pool.get().await?.as_mut(), key).await, + } + } + + async fn key_get_( + &self, + conn: &mut impl AsyncCommands, + key: LookupKey, + ) -> crate::Result> { + match key { + LookupKey::Key(key) => { + if let Some(value) = conn.get::<_, Option>>(key).await? { + T::deserialize(&value).map(|value| LookupValue::Value { value, expires: 0 }) + } else { + Ok(LookupValue::None) + } + } + LookupKey::Counter(key) => { + let value: Option = conn.get(key).await?; + Ok(LookupValue::Counter { + num: value.unwrap_or(0), + }) + } + } + } + + async fn key_set_( + &self, + conn: &mut impl AsyncCommands, + key: Vec, + value: LookupValue>, + ) -> crate::Result<()> { + match value { + LookupValue::Value { value, expires } => { + if expires > 0 { + conn.set_ex(key, value, expires).await?; + } else { + conn.set(key, value).await?; + } + } + LookupValue::Counter { num } => conn.incr(key, num).await?, + LookupValue::None => (), + } + + Ok(()) + } +} diff --git a/crates/store/src/backend/redis/mod.rs b/crates/store/src/backend/redis/mod.rs new file mode 100644 index 00000000..207d7441 --- /dev/null +++ b/crates/store/src/backend/redis/mod.rs @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::time::Duration; + +use deadpool::{ + managed::{Manager, Pool, PoolError}, + Runtime, +}; +use redis::{ + cluster::{ClusterClient, ClusterClientBuilder}, + Client, RedisError, +}; +use utils::config::{utils::AsKey, Config}; + +pub mod lookup; +pub mod pool; + +pub struct RedisStore { + pool: RedisPool, +} + +struct RedisConnectionManager { + client: Client, + timeout: Duration, +} + +struct RedisClusterConnectionManager { + client: ClusterClient, + timeout: Duration, +} + +enum RedisPool { + Single(Pool), + Cluster(Pool), +} + +impl RedisStore { + pub async fn open(config: &Config, prefix: impl AsKey) -> crate::Result { + let prefix = prefix.as_key(); + + let db = if let Some(url) = config.value((&prefix, "url")) { + Self { + pool: RedisPool::Single(build_pool( + config, + &prefix, + RedisConnectionManager { + client: Client::open(url)?, + timeout: config.property_or_static((&prefix, "timeout"), "10s")?, + }, + )?), + } + } else { + let addresses = config + .values((&prefix, "urls")) + .map(|(_, v)| v.to_string()) + .collect::>(); + if addresses.is_empty() { + return Err(crate::Error::InternalError(format!( + "No Redis cluster URLs specified for {prefix:?}" + ))); + } + let mut builder = ClusterClientBuilder::new(addresses.into_iter()); + if let Some(value) = config.property((&prefix, "username"))? { + builder = builder.username(value); + } + if let Some(value) = config.property((&prefix, "password"))? { + builder = builder.password(value); + } + if let Some(value) = config.property((&prefix, "retries"))? { + builder = builder.retries(value); + } + if let Some(value) = config.property::((&prefix, "max-retry-wait"))? { + builder = builder.max_retry_wait(value.as_secs()); + } + if let Some(value) = config.property::((&prefix, "min-retry-wait"))? { + builder = builder.min_retry_wait(value.as_secs()); + } + if let Some(true) = config.property::((&prefix, "read-from-replicas"))? { + builder = builder.read_from_replicas(); + } + Self { + pool: RedisPool::Cluster(build_pool( + config, + &prefix, + RedisClusterConnectionManager { + client: builder.build()?, + timeout: config.property_or_static((&prefix, "timeout"), "10s")?, + }, + )?), + } + }; + + Ok(db) + } +} + +fn build_pool( + config: &Config, + prefix: &str, + manager: M, +) -> utils::config::Result> { + Pool::builder(manager) + .runtime(Runtime::Tokio1) + .max_size(config.property_or_static((prefix, "pool.max-connections"), "10")?) + .create_timeout( + config + .property_or_static::((prefix, "pool.create-timeout"), "30s")? + .into(), + ) + .wait_timeout(config.property_or_static((prefix, "pool.wait-timeout"), "30s")?) + .recycle_timeout(config.property_or_static((prefix, "pool.recycle-timeout"), "30s")?) + .build() + .map_err(|err| { + format!( + "Failed to build pool for {prefix:?}: {err}", + prefix = prefix, + err = err + ) + }) +} + +impl From> for crate::Error { + fn from(value: PoolError) -> Self { + crate::Error::InternalError(format!("Redis pool error: {}", value)) + } +} + +impl From> for crate::Error { + fn from(value: PoolError) -> Self { + crate::Error::InternalError(format!("Connection pool {}", value)) + } +} + +impl From for crate::Error { + fn from(value: RedisError) -> Self { + crate::Error::InternalError(format!("Redis error: {}", value)) + } +} diff --git a/crates/store/src/backend/redis/pool.rs b/crates/store/src/backend/redis/pool.rs new file mode 100644 index 00000000..8953c5a5 --- /dev/null +++ b/crates/store/src/backend/redis/pool.rs @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use async_trait::async_trait; +use deadpool::managed; +use redis::{ + aio::{Connection, ConnectionLike}, + cluster_async::ClusterConnection, +}; + +use super::{RedisClusterConnectionManager, RedisConnectionManager}; + +#[async_trait] +impl managed::Manager for RedisConnectionManager { + type Type = Connection; + type Error = crate::Error; + + async fn create(&self) -> Result { + match tokio::time::timeout(self.timeout, self.client.get_tokio_connection()).await { + Ok(conn) => conn.map_err(Into::into), + Err(_) => Err(crate::Error::InternalError( + "Redis connection timeout".into(), + )), + } + } + + async fn recycle( + &self, + conn: &mut Connection, + _: &managed::Metrics, + ) -> managed::RecycleResult { + conn.req_packed_command(&redis::cmd("PING")) + .await + .map(|_| ()) + .map_err(|err| managed::RecycleError::Backend(err.into())) + } +} + +#[async_trait] +impl managed::Manager for RedisClusterConnectionManager { + type Type = ClusterConnection; + type Error = crate::Error; + + async fn create(&self) -> Result { + match tokio::time::timeout(self.timeout, self.client.get_async_connection()).await { + Ok(conn) => conn.map_err(Into::into), + Err(_) => Err(crate::Error::InternalError( + "Redis connection timeout".into(), + )), + } + } + + async fn recycle( + &self, + conn: &mut ClusterConnection, + _: &managed::Metrics, + ) -> managed::RecycleResult { + conn.req_packed_command(&redis::cmd("PING")) + .await + .map(|_| ()) + .map_err(|err| managed::RecycleError::Backend(err.into())) + } +} diff --git a/crates/store/src/config.rs b/crates/store/src/config.rs index f67de2a4..db23a876 100644 --- a/crates/store/src/config.rs +++ b/crates/store/src/config.rs @@ -53,6 +53,9 @@ use crate::backend::rocksdb::RocksDbStore; #[cfg(feature = "elastic")] use crate::backend::elastic::ElasticSearchStore; +#[cfg(feature = "redis")] +use crate::backend::redis::RedisStore; + #[async_trait] pub trait ConfigStore { async fn parse_stores(&self) -> utils::config::Result; @@ -161,6 +164,13 @@ impl ConfigStore for Config { ); continue; } + #[cfg(feature = "redis")] + "redis" => { + config + .lookup_stores + .insert(store_id, RedisStore::open(self, prefix).await?.into()); + continue; + } "memory" => { let prefix = prefix.as_key(); for lookup_id in self.sub_keys((&prefix, "lookup")) { diff --git a/crates/store/src/dispatch/lookup.rs b/crates/store/src/dispatch/lookup.rs index 2b848b0c..696ce235 100644 --- a/crates/store/src/dispatch/lookup.rs +++ b/crates/store/src/dispatch/lookup.rs @@ -49,6 +49,10 @@ impl LookupStore { )), }, LookupStore::Memory(store) => store.query(query, params), + #[cfg(feature = "redis")] + LookupStore::Redis(_) => Err(crate::Error::InternalError( + "Redis does not support queries".into(), + )), }; tracing::trace!( context = "store", event = "query", query = query, result = ?result); @@ -81,6 +85,8 @@ impl LookupStore { batch.ops.push(Operation::Value { class, op }); store.write(batch.build()).await } + #[cfg(feature = "redis")] + LookupStore::Redis(store) => store.key_set(key, value).await, LookupStore::Memory(_) => unimplemented!(), } } @@ -110,6 +116,8 @@ impl LookupStore { .await .map(|num| LookupValue::Counter { num }), }, + #[cfg(feature = "redis")] + LookupStore::Redis(store) => store.key_get(key).await, LookupStore::Memory(_) => unimplemented!(), } } @@ -137,7 +145,7 @@ impl LookupStore { store .iterate(IterateParams::new(from_key, to_key), |key, value| { if value.deserialize_be_u64(0)? < current_time { - expired_keys.push(key.to_vec()); + expired_keys.push(key.get(1..).unwrap_or_default().to_vec()); } Ok(true) }) @@ -159,6 +167,8 @@ impl LookupStore { } } } + #[cfg(feature = "redis")] + LookupStore::Redis(store) => {} LookupStore::Memory(_) => {} } diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index 126a3757..f7d8a890 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -434,7 +434,7 @@ impl Store { value ); } - SUBSPACE_INDEX_VALUES if key[0] >= 2 => { + SUBSPACE_INDEX_VALUES if key[0] >= 3 => { // Ignore named keys return Ok(true); } diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 56c0b831..e88ad5ae 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -60,6 +60,9 @@ use backend::rocksdb::RocksDbStore; #[cfg(feature = "elastic")] use backend::elastic::ElasticSearchStore; +#[cfg(feature = "redis")] +use backend::redis::RedisStore; + pub trait Deserialize: Sized + Sync + Send { fn deserialize(bytes: &[u8]) -> crate::Result; } @@ -236,6 +239,8 @@ pub enum FtsStore { pub enum LookupStore { Store(Store), Memory(Arc), + #[cfg(feature = "redis")] + Redis(Arc), } #[cfg(feature = "sqlite")] @@ -293,6 +298,13 @@ impl From for FtsStore { } } +#[cfg(feature = "redis")] +impl From for LookupStore { + fn from(store: RedisStore) -> Self { + Self::Redis(Arc::new(store)) + } +} + impl From for FtsStore { fn from(store: Store) -> Self { Self::Store(store) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 87fe4a82..95173913 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -5,8 +5,8 @@ edition = "2021" resolver = "2" [features] -#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"] -default = ["sqlite", "postgres", "mysql", "foundationdb", "rocks"] +#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"] +default = ["sqlite", "postgres", "mysql", "redis"] sqlite = ["store/sqlite"] foundationdb = ["store/foundation"] postgres = ["store/postgres"] @@ -14,6 +14,7 @@ mysql = ["store/mysql"] rocks = ["store/rocks"] elastic = ["store/elastic"] s3 = ["store/s3"] +redis = ["store/redis"] [dev-dependencies] store = { path = "../crates/store", features = ["test_mode"] } diff --git a/tests/src/smtp/inbound/antispam.rs b/tests/src/smtp/inbound/antispam.rs index d5be8012..69c0624d 100644 --- a/tests/src/smtp/inbound/antispam.rs +++ b/tests/src/smtp/inbound/antispam.rs @@ -46,6 +46,10 @@ duplicate-expiry = "7d" type = "sqlite" path = "%PATH%/test_antispam.db" +#[store."redis"] +#type = "redis" +#url = "redis://127.0.0.1" + [store."default"] type = "memory" diff --git a/tests/src/store/lookup.rs b/tests/src/store/lookup.rs new file mode 100644 index 00000000..70d3fb61 --- /dev/null +++ b/tests/src/store/lookup.rs @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2023, Stalwart Labs Ltd. + * + * This file is part of the Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use store::{config::ConfigStore, LookupKey, LookupStore, LookupValue}; +use utils::config::Config; + +use crate::store::{TempDir, CONFIG}; + +#[tokio::test] +pub async fn lookup_tests() { + let temp_dir = TempDir::new("lookup_tests", true); + let config = + Config::new(&CONFIG.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap())).unwrap(); + let stores = config.parse_stores().await.unwrap(); + + for (store_id, store) in stores.lookup_stores { + println!("Testing lookup store {}...", store_id); + if let LookupStore::Store(store) = &store { + store.destroy().await; + } + + // Test value expiry + let key = "xyz".as_bytes().to_vec(); + assert_eq!( + LookupValue::None, + store + .key_get::(LookupKey::Key(key.clone())) + .await + .unwrap() + ); + store + .key_set( + key.clone(), + LookupValue::Value { + value: "hello".to_string().into_bytes(), + expires: 1, + }, + ) + .await + .unwrap(); + assert!(matches!(store + .key_get::(LookupKey::Key(key.clone())) + .await + .unwrap(), LookupValue::Value { value,.. } if value == "hello")); + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + assert_eq!( + LookupValue::None, + store + .key_get::(LookupKey::Key(key.clone())) + .await + .unwrap() + ); + + store.purge_expired().await.unwrap(); + if let LookupStore::Store(store) = &store { + store.assert_is_empty(store.clone().into()).await; + } + + // Test key + store + .key_set( + key.clone(), + LookupValue::Value { + value: "world".to_string().into_bytes(), + expires: 0, + }, + ) + .await + .unwrap(); + store.purge_expired().await.unwrap(); + assert!(matches!(store + .key_get::(LookupKey::Key(key.clone())) + .await + .unwrap(), LookupValue::Value { value,.. } if value == "world")); + + // Test counter + let key = "abc".as_bytes().to_vec(); + store + .key_set(key.clone(), LookupValue::Counter { num: 1 }) + .await + .unwrap(); + assert_eq!( + LookupValue::Counter { num: 1 }, + store + .key_get::(LookupKey::Counter(key.clone())) + .await + .unwrap() + ); + store + .key_set(key.clone(), LookupValue::Counter { num: 2 }) + .await + .unwrap(); + assert_eq!( + LookupValue::Counter { num: 3 }, + store + .key_get::(LookupKey::Counter(key.clone())) + .await + .unwrap() + ); + } +} diff --git a/tests/src/store/mod.rs b/tests/src/store/mod.rs index 605cd3f6..b3ec5c79 100644 --- a/tests/src/store/mod.rs +++ b/tests/src/store/mod.rs @@ -23,6 +23,7 @@ pub mod assign_id; pub mod blob; +pub mod lookup; pub mod ops; pub mod query; @@ -74,6 +75,11 @@ port = 3307 database = "stalwart" user = "root" password = "password" + +[store."redis"] +type = "redis" +url = "redis://127.0.0.1" + "#; #[tokio::test]