From f989f4f750d129748f549745c2854872fe43335d Mon Sep 17 00:00:00 2001 From: mdecimus Date: Sat, 2 Mar 2024 18:02:30 +0100 Subject: [PATCH] LZ4 compress blobs by default (closes #227) --- Cargo.lock | 3 - crates/imap/src/op/fetch.rs | 2 +- crates/jmap/Cargo.toml | 2 +- crates/jmap/src/blob/download.rs | 7 +- crates/jmap/src/blob/upload.rs | 13 +- crates/jmap/src/email/get.rs | 3 +- crates/jmap/src/email/snippet.rs | 17 ++- crates/jmap/src/services/index.rs | 5 +- crates/jmap/src/services/ingest.rs | 2 +- crates/jmap/src/sieve/get.rs | 2 +- crates/jmap/src/submission/set.rs | 30 ++-- crates/smtp/src/outbound/session.rs | 2 +- crates/store/Cargo.toml | 2 +- crates/store/src/backend/foundationdb/blob.rs | 2 +- crates/store/src/backend/fs/mod.rs | 11 +- crates/store/src/backend/mysql/blob.rs | 9 +- crates/store/src/backend/postgres/blob.rs | 9 +- crates/store/src/backend/rocksdb/blob.rs | 9 +- crates/store/src/backend/s3/mod.rs | 4 +- crates/store/src/backend/sqlite/blob.rs | 9 +- crates/store/src/config.rs | 55 ++++--- crates/store/src/dispatch/blob.rs | 143 +++++++++++++++--- crates/store/src/dispatch/store.rs | 6 +- crates/store/src/lib.rs | 29 +++- tests/src/smtp/inbound/mod.rs | 2 +- tests/src/smtp/mod.rs | 2 +- tests/src/smtp/queue/dsn.rs | 2 +- tests/src/store/blob.rs | 18 +-- 28 files changed, 256 insertions(+), 144 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac50721b..16ef4193 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3134,9 +3134,6 @@ name = "lz4_flex" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "912b45c753ff5f7f5208307e8ace7d2a2e30d024e26d3509f3dce546c044ce15" -dependencies = [ - "twox-hash", -] [[package]] name = "mail-auth" diff --git a/crates/imap/src/op/fetch.rs b/crates/imap/src/op/fetch.rs index 8f8c3814..4ea3c7af 100644 --- a/crates/imap/src/op/fetch.rs +++ b/crates/imap/src/op/fetch.rs @@ -318,7 +318,7 @@ impl SessionData { // Fetch and parse blob let raw_message = if needs_blobs { // Retrieve raw message if needed - match self.jmap.get_blob(&email.blob_hash, 0..u32::MAX).await { + match self.jmap.get_blob(&email.blob_hash, 0..usize::MAX).await { Ok(Some(raw_message)) => raw_message, Ok(None) => { tracing::warn!(event = "not-found", diff --git a/crates/jmap/Cargo.toml b/crates/jmap/Cargo.toml index 8cb53609..69902ddd 100644 --- a/crates/jmap/Cargo.toml +++ b/crates/jmap/Cargo.toml @@ -50,7 +50,7 @@ rasn-cms = "0.10" rasn-pkix = "0.10" rsa = "0.9.2" async-trait = "0.1.68" -lz4_flex = { version = "0.11" } +lz4_flex = { version = "0.11", default-features = false } [dev-dependencies] ece = "2.2" diff --git a/crates/jmap/src/blob/download.rs b/crates/jmap/src/blob/download.rs index 2de27a93..0e054cc5 100644 --- a/crates/jmap/src/blob/download.rs +++ b/crates/jmap/src/blob/download.rs @@ -102,7 +102,7 @@ impl JMAP { if let Some(section) = &blob_id.section { self.get_blob_section(&blob_id.hash, section).await } else { - self.get_blob(&blob_id.hash, 0..u32::MAX).await + self.get_blob(&blob_id.hash, 0..usize::MAX).await } } @@ -114,8 +114,7 @@ impl JMAP { Ok(self .get_blob( hash, - (section.offset_start as u32) - ..(section.offset_start.saturating_add(section.size) as u32), + (section.offset_start)..(section.offset_start.saturating_add(section.size)), ) .await? .and_then(|bytes| match Encoding::from(section.encoding) { @@ -128,7 +127,7 @@ impl JMAP { pub async fn get_blob( &self, hash: &BlobHash, - range: Range, + range: Range, ) -> Result>, MethodError> { match self.blob_store.get_blob(hash.as_ref(), range).await { Ok(blob) => Ok(blob), diff --git a/crates/jmap/src/blob/upload.rs b/crates/jmap/src/blob/upload.rs index abae38ba..2c55d174 100644 --- a/crates/jmap/src/blob/upload.rs +++ b/crates/jmap/src/blob/upload.rs @@ -95,22 +95,19 @@ impl JMAP { continue 'outer; } - let offset = offset.unwrap_or(0) as u32; + let offset = offset.unwrap_or(0); let length = length - .map(|length| (length as u32).saturating_add(offset)) - .unwrap_or(u32::MAX); + .map(|length| length.saturating_add(offset)) + .unwrap_or(usize::MAX); let bytes = if let Some(section) = &id.section { self.get_blob_section(&id.hash, section) .await? .map(|bytes| { - if offset == 0 && length == u32::MAX { + if offset == 0 && length == usize::MAX { bytes } else { bytes - .get( - offset as usize - ..std::cmp::min(length as usize, bytes.len()), - ) + .get(offset..std::cmp::min(length, bytes.len())) .unwrap_or_default() .to_vec() } diff --git a/crates/jmap/src/email/get.rs b/crates/jmap/src/email/get.rs index 924e793c..b2a688bd 100644 --- a/crates/jmap/src/email/get.rs +++ b/crates/jmap/src/email/get.rs @@ -169,7 +169,8 @@ impl JMAP { // Retrieve raw message if needed let raw_message = if needs_body { - if let Some(raw_message) = self.get_blob(&metadata.blob_hash, 0..u32::MAX).await? { + if let Some(raw_message) = self.get_blob(&metadata.blob_hash, 0..usize::MAX).await? + { raw_message } else { tracing::warn!(event = "not-found", diff --git a/crates/jmap/src/email/snippet.rs b/crates/jmap/src/email/snippet.rs index 4f55dca1..2c7b59d2 100644 --- a/crates/jmap/src/email/snippet.rs +++ b/crates/jmap/src/email/snippet.rs @@ -150,19 +150,20 @@ impl JMAP { snippet.preview = body.into(); } else {*/ // Download message - let raw_message = - if let Some(raw_message) = self.get_blob(&metadata.blob_hash, 0..u32::MAX).await? { - raw_message - } else { - tracing::warn!(event = "not-found", + let raw_message = if let Some(raw_message) = + self.get_blob(&metadata.blob_hash, 0..usize::MAX).await? + { + raw_message + } else { + tracing::warn!(event = "not-found", account_id = account_id, collection = ?Collection::Email, document_id = email_id.document_id(), blob_id = ?metadata.blob_hash, "Blob not found"); - response.not_found.push(email_id); - continue; - }; + response.not_found.push(email_id); + continue; + }; // Find a matching part 'outer: for part in &metadata.contents.parts { diff --git a/crates/jmap/src/services/index.rs b/crates/jmap/src/services/index.rs index 1192055f..455d95a3 100644 --- a/crates/jmap/src/services/index.rs +++ b/crates/jmap/src/services/index.rs @@ -95,8 +95,9 @@ impl JMAP { if metadata.inner.blob_hash.as_slice() == blob_hash.as_slice() => { // Obtain raw message - let raw_message = if let Ok(Some(raw_message)) = - self.get_blob(&metadata.inner.blob_hash, 0..u32::MAX).await + let raw_message = if let Ok(Some(raw_message)) = self + .get_blob(&metadata.inner.blob_hash, 0..usize::MAX) + .await { raw_message } else { diff --git a/crates/jmap/src/services/ingest.rs b/crates/jmap/src/services/ingest.rs index cf2e56ed..fb97e8bf 100644 --- a/crates/jmap/src/services/ingest.rs +++ b/crates/jmap/src/services/ingest.rs @@ -34,7 +34,7 @@ impl JMAP { // Read message let raw_message = match self .blob_store - .get_blob(message.message_blob.as_slice(), 0..u32::MAX) + .get_blob(message.message_blob.as_slice(), 0..usize::MAX) .await { Ok(Some(raw_message)) => raw_message, diff --git a/crates/jmap/src/sieve/get.rs b/crates/jmap/src/sieve/get.rs index 8522a85b..530b9cde 100644 --- a/crates/jmap/src/sieve/get.rs +++ b/crates/jmap/src/sieve/get.rs @@ -226,7 +226,7 @@ impl JMAP { // Obtain the sieve script blob let script_bytes = self - .get_blob(&blob_id.hash, 0..u32::MAX) + .get_blob(&blob_id.hash, 0..usize::MAX) .await? .ok_or(MethodError::ServerPartialFail)?; diff --git a/crates/jmap/src/submission/set.rs b/crates/jmap/src/submission/set.rs index cf4998f0..98be8f02 100644 --- a/crates/jmap/src/submission/set.rs +++ b/crates/jmap/src/submission/set.rs @@ -559,22 +559,22 @@ impl JMAP { ); // Obtain raw message - let message = if let Some(message) = self.get_blob(&metadata.blob_hash, 0..u32::MAX).await? - { - if message.len() > self.config.mail_max_size { - return Ok(Err(SetError::new(SetErrorType::InvalidEmail) - .with_description(format!( - "Message exceeds maximum size of {} bytes.", - self.config.mail_max_size - )))); - } + let message = + if let Some(message) = self.get_blob(&metadata.blob_hash, 0..usize::MAX).await? { + if message.len() > self.config.mail_max_size { + return Ok(Err(SetError::new(SetErrorType::InvalidEmail) + .with_description(format!( + "Message exceeds maximum size of {} bytes.", + self.config.mail_max_size + )))); + } - message - } else { - return Ok(Err(SetError::invalid_properties() - .with_property(Property::EmailId) - .with_description("Blob for email not found."))); - }; + message + } else { + return Ok(Err(SetError::invalid_properties() + .with_property(Property::EmailId) + .with_description("Blob for email not found."))); + }; // Begin local SMTP session let mut session = diff --git a/crates/smtp/src/outbound/session.rs b/crates/smtp/src/outbound/session.rs index 74c55418..ec679a81 100644 --- a/crates/smtp/src/outbound/session.rs +++ b/crates/smtp/src/outbound/session.rs @@ -537,7 +537,7 @@ pub async fn send_message( .core .shared .default_blob_store - .get_blob(message.blob_hash.as_slice(), 0..u32::MAX) + .get_blob(message.blob_hash.as_slice(), 0..usize::MAX) .await { Ok(Some(raw_message)) => tokio::time::timeout(params.timeout_data, async { diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 4905d679..40cfa1cf 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -27,7 +27,7 @@ lru-cache = { version = "0.1.2", optional = true } num_cpus = { version = "1.15.0", optional = true } blake3 = "1.3.3" tracing = "0.1" -lz4_flex = { version = "0.11" } +lz4_flex = { version = "0.11", default-features = false } deadpool-postgres = { version = "0.12.1", optional = true } tokio-postgres = { version = "0.7.10", optional = true } tokio-rustls = { version = "0.25.0", optional = true } diff --git a/crates/store/src/backend/foundationdb/blob.rs b/crates/store/src/backend/foundationdb/blob.rs index eb7e1c10..d1c6ad29 100644 --- a/crates/store/src/backend/foundationdb/blob.rs +++ b/crates/store/src/backend/foundationdb/blob.rs @@ -35,7 +35,7 @@ impl FdbStore { pub(crate) async fn get_blob( &self, key: &[u8], - range: Range, + range: Range, ) -> crate::Result>> { let block_start = range.start as usize / MAX_VALUE_SIZE; let bytes_start = range.start as usize % MAX_VALUE_SIZE; diff --git a/crates/store/src/backend/fs/mod.rs b/crates/store/src/backend/fs/mod.rs index 118bc41c..a0393c24 100644 --- a/crates/store/src/backend/fs/mod.rs +++ b/crates/store/src/backend/fs/mod.rs @@ -59,23 +59,22 @@ impl FsStore { pub(crate) async fn get_blob( &self, key: &[u8], - range: Range, + range: Range, ) -> crate::Result>> { let blob_path = self.build_path(key); let blob_size = match fs::metadata(&blob_path).await { - Ok(m) => m.len(), + Ok(m) => m.len() as usize, Err(_) => return Ok(None), }; let mut blob = File::open(&blob_path).await?; - Ok(Some(if range.start != 0 || range.end != u32::MAX { - let from_offset = if range.start < blob_size as u32 { + Ok(Some(if range.start != 0 || range.end != usize::MAX { + let from_offset = if range.start < blob_size { range.start } else { 0 }; - let mut buf = - vec![0; (std::cmp::min(range.end, blob_size as u32) - from_offset) as usize]; + let mut buf = vec![0; (std::cmp::min(range.end, blob_size) - from_offset) as usize]; if from_offset > 0 { blob.seek(SeekFrom::Start(from_offset as u64)).await?; diff --git a/crates/store/src/backend/mysql/blob.rs b/crates/store/src/backend/mysql/blob.rs index be837c93..92c3cb4c 100644 --- a/crates/store/src/backend/mysql/blob.rs +++ b/crates/store/src/backend/mysql/blob.rs @@ -31,22 +31,19 @@ impl MysqlStore { pub(crate) async fn get_blob( &self, key: &[u8], - range: Range, + range: Range, ) -> crate::Result>> { let mut conn = self.conn_pool.get_conn().await?; let s = conn.prep("SELECT v FROM t WHERE k = ?").await?; conn.exec_first::, _, _>(&s, (key,)) .await .map(|bytes| { - if range.start == 0 && range.end == u32::MAX { + if range.start == 0 && range.end == usize::MAX { bytes } else { bytes.map(|bytes| { bytes - .get( - range.start as usize - ..std::cmp::min(bytes.len(), range.end as usize), - ) + .get(range.start..std::cmp::min(bytes.len(), range.end)) .unwrap_or_default() .to_vec() }) diff --git a/crates/store/src/backend/postgres/blob.rs b/crates/store/src/backend/postgres/blob.rs index cf2902ea..92490052 100644 --- a/crates/store/src/backend/postgres/blob.rs +++ b/crates/store/src/backend/postgres/blob.rs @@ -29,7 +29,7 @@ impl PostgresStore { pub(crate) async fn get_blob( &self, key: &[u8], - range: Range, + range: Range, ) -> crate::Result>> { let conn = self.conn_pool.get().await?; let s = conn.prepare_cached("SELECT v FROM t WHERE k = $1").await?; @@ -37,15 +37,12 @@ impl PostgresStore { .await .and_then(|row| { if let Some(row) = row { - Ok(Some(if range.start == 0 && range.end == u32::MAX { + Ok(Some(if range.start == 0 && range.end == usize::MAX { row.try_get::<_, Vec>(0)? } else { let bytes = row.try_get::<_, &[u8]>(0)?; bytes - .get( - range.start as usize - ..std::cmp::min(bytes.len(), range.end as usize), - ) + .get(range.start..std::cmp::min(bytes.len(), range.end)) .unwrap_or_default() .to_vec() })) diff --git a/crates/store/src/backend/rocksdb/blob.rs b/crates/store/src/backend/rocksdb/blob.rs index d8f34fdf..c9177e54 100644 --- a/crates/store/src/backend/rocksdb/blob.rs +++ b/crates/store/src/backend/rocksdb/blob.rs @@ -29,21 +29,18 @@ impl RocksDbStore { pub(crate) async fn get_blob( &self, key: &[u8], - range: Range, + range: Range, ) -> crate::Result>> { let db = self.db.clone(); self.spawn_worker(move || { db.get_pinned_cf(&db.cf_handle(CF_BLOBS).unwrap(), key) .map(|obj| { obj.map(|bytes| { - if range.start == 0 && range.end == u32::MAX { + if range.start == 0 && range.end == usize::MAX { bytes.to_vec() } else { bytes - .get( - range.start as usize - ..std::cmp::min(bytes.len(), range.end as usize), - ) + .get(range.start..std::cmp::min(bytes.len(), range.end)) .unwrap_or_default() .to_vec() } diff --git a/crates/store/src/backend/s3/mod.rs b/crates/store/src/backend/s3/mod.rs index 547abc86..81afd809 100644 --- a/crates/store/src/backend/s3/mod.rs +++ b/crates/store/src/backend/s3/mod.rs @@ -75,10 +75,10 @@ impl S3Store { pub(crate) async fn get_blob( &self, key: &[u8], - range: Range, + range: Range, ) -> crate::Result>> { let path = self.build_key(key); - let response = if range.start != 0 || range.end != u32::MAX { + let response = if range.start != 0 || range.end != usize::MAX { self.bucket .get_object_range( path, diff --git a/crates/store/src/backend/sqlite/blob.rs b/crates/store/src/backend/sqlite/blob.rs index a098b075..1506724d 100644 --- a/crates/store/src/backend/sqlite/blob.rs +++ b/crates/store/src/backend/sqlite/blob.rs @@ -31,7 +31,7 @@ impl SqliteStore { pub(crate) async fn get_blob( &self, key: &[u8], - range: Range, + range: Range, ) -> crate::Result>> { let conn = self.conn_pool.get()?; self.spawn_worker(move || { @@ -40,14 +40,11 @@ impl SqliteStore { .query_row([&key], |row| { Ok({ let bytes = row.get_ref(0)?.as_bytes()?; - if range.start == 0 && range.end == u32::MAX { + if range.start == 0 && range.end == usize::MAX { bytes.to_vec() } else { bytes - .get( - range.start as usize - ..std::cmp::min(bytes.len(), range.end as usize), - ) + .get(range.start..std::cmp::min(bytes.len(), range.end)) .unwrap_or_default() .to_vec() } diff --git a/crates/store/src/config.rs b/crates/store/src/config.rs index e0eeb360..9d89b9cd 100644 --- a/crates/store/src/config.rs +++ b/crates/store/src/config.rs @@ -28,7 +28,7 @@ use utils::config::{cron::SimpleCron, Config}; use crate::{ backend::{fs::FsStore, memory::MemoryStore}, write::purge::{PurgeSchedule, PurgeStore}, - LookupStore, QueryStore, Store, Stores, + BlobStore, CompressionAlgo, LookupStore, QueryStore, Store, Stores, }; #[cfg(feature = "s3")] @@ -83,6 +83,8 @@ impl ConfigStore for Config { .to_ascii_lowercase(); let prefix = ("store", id); let store_id = id.to_string(); + let compression_algo = + self.property_or_static::(("store", id, "compression"), "lz4")?; let lookup_store: Store = match protocol.as_str() { #[cfg(feature = "rocks")] @@ -92,9 +94,10 @@ impl ConfigStore for Config { config .fts_stores .insert(store_id.clone(), db.clone().into()); - config - .blob_stores - .insert(store_id.clone(), db.clone().into()); + config.blob_stores.insert( + store_id.clone(), + BlobStore::from(db.clone()).with_compression(compression_algo), + ); config.lookup_stores.insert(store_id, db.into()); continue; } @@ -105,9 +108,10 @@ impl ConfigStore for Config { config .fts_stores .insert(store_id.clone(), db.clone().into()); - config - .blob_stores - .insert(store_id.clone(), db.clone().into()); + config.blob_stores.insert( + store_id.clone(), + BlobStore::from(db.clone()).with_compression(compression_algo), + ); config.lookup_stores.insert(store_id, db.into()); continue; } @@ -118,9 +122,10 @@ impl ConfigStore for Config { config .fts_stores .insert(store_id.clone(), db.clone().into()); - config - .blob_stores - .insert(store_id.clone(), db.clone().into()); + config.blob_stores.insert( + store_id.clone(), + BlobStore::from(db.clone()).with_compression(compression_algo), + ); db } #[cfg(feature = "mysql")] @@ -130,9 +135,10 @@ impl ConfigStore for Config { config .fts_stores .insert(store_id.clone(), db.clone().into()); - config - .blob_stores - .insert(store_id.clone(), db.clone().into()); + config.blob_stores.insert( + store_id.clone(), + BlobStore::from(db.clone()).with_compression(compression_algo), + ); db } #[cfg(feature = "sqlite")] @@ -142,22 +148,27 @@ impl ConfigStore for Config { config .fts_stores .insert(store_id.clone(), db.clone().into()); - config - .blob_stores - .insert(store_id.clone(), db.clone().into()); + config.blob_stores.insert( + store_id.clone(), + BlobStore::from(db.clone()).with_compression(compression_algo), + ); db } "fs" => { - config - .blob_stores - .insert(store_id, FsStore::open(self, prefix).await?.into()); + config.blob_stores.insert( + store_id, + BlobStore::from(FsStore::open(self, prefix).await?) + .with_compression(compression_algo), + ); continue; } #[cfg(feature = "s3")] "s3" => { - config - .blob_stores - .insert(store_id, S3Store::open(self, prefix).await?.into()); + config.blob_stores.insert( + store_id, + BlobStore::from(S3Store::open(self, prefix).await?) + .with_compression(compression_algo), + ); continue; } #[cfg(feature = "elastic")] diff --git a/crates/store/src/dispatch/blob.rs b/crates/store/src/dispatch/blob.rs index 005ac17d..a80661b8 100644 --- a/crates/store/src/dispatch/blob.rs +++ b/crates/store/src/dispatch/blob.rs @@ -21,54 +21,110 @@ * for more details. */ -use std::ops::Range; +use std::{borrow::Cow, ops::Range}; -use crate::{BlobStore, Store}; +use utils::config::utils::ParseValue; + +use crate::{BlobBackend, BlobStore, CompressionAlgo, Store}; impl BlobStore { - pub async fn get_blob(&self, key: &[u8], range: Range) -> crate::Result>> { - match self { - Self::Store(store) => match store { + pub async fn get_blob( + &self, + key: &[u8], + range: Range, + ) -> crate::Result>> { + let read_range = match self.compression { + CompressionAlgo::None => range.clone(), + CompressionAlgo::Lz4 => 0..usize::MAX, + }; + + let result = match &self.backend { + BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] - Store::SQLite(store) => store.get_blob(key, range).await, + Store::SQLite(store) => store.get_blob(key, read_range).await, #[cfg(feature = "foundation")] - Store::FoundationDb(store) => store.get_blob(key, range).await, + Store::FoundationDb(store) => store.get_blob(key, read_range).await, #[cfg(feature = "postgres")] - Store::PostgreSQL(store) => store.get_blob(key, range).await, + Store::PostgreSQL(store) => store.get_blob(key, read_range).await, #[cfg(feature = "mysql")] - Store::MySQL(store) => store.get_blob(key, range).await, + Store::MySQL(store) => store.get_blob(key, read_range).await, #[cfg(feature = "rocks")] - Store::RocksDb(store) => store.get_blob(key, range).await, + Store::RocksDb(store) => store.get_blob(key, read_range).await, }, - Self::Fs(store) => store.get_blob(key, range).await, + BlobBackend::Fs(store) => store.get_blob(key, read_range).await, #[cfg(feature = "s3")] - Self::S3(store) => store.get_blob(key, range).await, + BlobBackend::S3(store) => store.get_blob(key, read_range).await, + }; + + let decompressed = match self.compression { + CompressionAlgo::Lz4 => match result? { + Some(data) + if data.last().copied().unwrap_or_default() + == CompressionAlgo::Lz4.marker() => + { + lz4_flex::decompress_size_prepended( + data.get(..data.len() - 1).unwrap_or_default(), + ) + .map_err(|err| { + crate::Error::InternalError(format!( + "Failed to decompress LZ4 data: {}", + err + )) + })? + } + Some(data) => { + tracing::debug!("Warning: Missing LZ4 marker for key: {key:?}"); + data + } + None => return Ok(None), + }, + _ => return result, + }; + + if range.end >= decompressed.len() { + Ok(Some(decompressed)) + } else { + Ok(Some( + decompressed + .get(range.start..range.end) + .unwrap_or_default() + .to_vec(), + )) } } pub async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> { - match self { - Self::Store(store) => match store { + let data: Cow<[u8]> = match self.compression { + CompressionAlgo::None => data.into(), + CompressionAlgo::Lz4 => { + let mut compressed = lz4_flex::compress_prepend_size(data); + compressed.push(CompressionAlgo::Lz4.marker()); + compressed.into() + } + }; + + match &self.backend { + BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] - Store::SQLite(store) => store.put_blob(key, data).await, + Store::SQLite(store) => store.put_blob(key, data.as_ref()).await, #[cfg(feature = "foundation")] - Store::FoundationDb(store) => store.put_blob(key, data).await, + Store::FoundationDb(store) => store.put_blob(key, data.as_ref()).await, #[cfg(feature = "postgres")] - Store::PostgreSQL(store) => store.put_blob(key, data).await, + Store::PostgreSQL(store) => store.put_blob(key, data.as_ref()).await, #[cfg(feature = "mysql")] - Store::MySQL(store) => store.put_blob(key, data).await, + Store::MySQL(store) => store.put_blob(key, data.as_ref()).await, #[cfg(feature = "rocks")] - Store::RocksDb(store) => store.put_blob(key, data).await, + Store::RocksDb(store) => store.put_blob(key, data.as_ref()).await, }, - Self::Fs(store) => store.put_blob(key, data).await, + BlobBackend::Fs(store) => store.put_blob(key, data.as_ref()).await, #[cfg(feature = "s3")] - Self::S3(store) => store.put_blob(key, data).await, + BlobBackend::S3(store) => store.put_blob(key, data.as_ref()).await, } } pub async fn delete_blob(&self, key: &[u8]) -> crate::Result { - match self { - Self::Store(store) => match store { + match &self.backend { + BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] Store::SQLite(store) => store.delete_blob(key).await, #[cfg(feature = "foundation")] @@ -80,9 +136,46 @@ impl BlobStore { #[cfg(feature = "rocks")] Store::RocksDb(store) => store.delete_blob(key).await, }, - Self::Fs(store) => store.delete_blob(key).await, + BlobBackend::Fs(store) => store.delete_blob(key).await, #[cfg(feature = "s3")] - Self::S3(store) => store.delete_blob(key).await, + BlobBackend::S3(store) => store.delete_blob(key).await, + } + } + + pub fn with_compression(self, compression: CompressionAlgo) -> Self { + Self { + backend: self.backend, + compression, + } + } +} + +const MAGIC_MARKER: u8 = 0xa0; + +impl CompressionAlgo { + pub fn marker(&self) -> u8 { + match self { + CompressionAlgo::Lz4 => MAGIC_MARKER | 0x01, + //CompressionAlgo::Zstd => MAGIC_MARKER | 0x02, + CompressionAlgo::None => 0, + } + } +} + +impl ParseValue for CompressionAlgo { + fn parse_value( + key: impl utils::config::utils::AsKey, + value: &str, + ) -> utils::config::Result { + match value { + "lz4" => Ok(CompressionAlgo::Lz4), + //"zstd" => Ok(CompressionAlgo::Zstd), + "none" | "false" | "disable" | "disabled" => Ok(CompressionAlgo::None), + algo => Err(format!( + "Invalid compression algorithm: {} for key {}", + algo, + key.as_key() + )), } } } diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index 881a599b..1665ed5e 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -339,7 +339,11 @@ impl Store { Ok(()) } - pub async fn get_blob(&self, key: &[u8], range: Range) -> crate::Result>> { + pub async fn get_blob( + &self, + key: &[u8], + range: Range, + ) -> crate::Result>> { match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.get_blob(key, range).await, diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index d2b48d4d..5e9d7198 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -207,7 +207,19 @@ pub enum Store { } #[derive(Clone)] -pub enum BlobStore { +pub struct BlobStore { + pub backend: BlobBackend, + pub compression: CompressionAlgo, +} + +#[derive(Clone, Copy, Debug)] +pub enum CompressionAlgo { + None, + Lz4, +} + +#[derive(Clone)] +pub enum BlobBackend { Store(Store), Fs(Arc), #[cfg(feature = "s3")] @@ -272,14 +284,20 @@ impl From for Store { impl From for BlobStore { fn from(store: FsStore) -> Self { - Self::Fs(Arc::new(store)) + BlobStore { + backend: BlobBackend::Fs(Arc::new(store)), + compression: CompressionAlgo::None, + } } } #[cfg(feature = "s3")] impl From for BlobStore { fn from(store: S3Store) -> Self { - Self::S3(Arc::new(store)) + BlobStore { + backend: BlobBackend::S3(Arc::new(store)), + compression: CompressionAlgo::None, + } } } @@ -305,7 +323,10 @@ impl From for FtsStore { impl From for BlobStore { fn from(store: Store) -> Self { - Self::Store(store) + BlobStore { + backend: BlobBackend::Store(store), + compression: CompressionAlgo::None, + } } } diff --git a/tests/src/smtp/inbound/mod.rs b/tests/src/smtp/inbound/mod.rs index f073c299..0c6843f9 100644 --- a/tests/src/smtp/inbound/mod.rs +++ b/tests/src/smtp/inbound/mod.rs @@ -362,7 +362,7 @@ impl TestMessage for Message { async fn read_message(&self, core: &QueueReceiver) -> String { String::from_utf8( core.blob_store - .get_blob(self.blob_hash.as_slice(), 0..u32::MAX) + .get_blob(self.blob_hash.as_slice(), 0..usize::MAX) .await .unwrap() .expect("Message blob not found"), diff --git a/tests/src/smtp/mod.rs b/tests/src/smtp/mod.rs index 3fcad2ea..1e353040 100644 --- a/tests/src/smtp/mod.rs +++ b/tests/src/smtp/mod.rs @@ -206,7 +206,7 @@ impl TestConfig for SMTP { blocked_ips: Arc::new(BlockedIps::new(store.clone().into())), }), default_lookup_store: LookupStore::Store(store.clone()), - default_blob_store: store::BlobStore::Store(store.clone()), + default_blob_store: store.clone().into(), default_data_store: store, }, } diff --git a/tests/src/smtp/queue/dsn.rs b/tests/src/smtp/queue/dsn.rs index bc416991..b7c64dbd 100644 --- a/tests/src/smtp/queue/dsn.rs +++ b/tests/src/smtp/queue/dsn.rs @@ -174,7 +174,7 @@ impl QueueReceiver { let bytes = self .blob_store - .get_blob(message.blob_hash.as_slice(), 0..u32::MAX) + .get_blob(message.blob_hash.as_slice(), 0..usize::MAX) .await .unwrap() .unwrap(); diff --git a/tests/src/store/blob.rs b/tests/src/store/blob.rs index a54d9711..b59ea8fe 100644 --- a/tests/src/store/blob.rs +++ b/tests/src/store/blob.rs @@ -93,7 +93,7 @@ pub async fn blob_tests() { // Blob hash should now exist assert!(store.blob_exists(&hash).await.unwrap()); assert!(blob_store - .get_blob(hash.as_ref(), 0..u32::MAX) + .get_blob(hash.as_ref(), 0..usize::MAX) .await .unwrap() .is_some()); @@ -149,7 +149,7 @@ pub async fn blob_tests() { // Blob should no longer be in store assert!(blob_store - .get_blob(hash.as_ref(), 0..u32::MAX) + .get_blob(hash.as_ref(), 0..usize::MAX) .await .unwrap() .is_none()); @@ -269,7 +269,7 @@ pub async fn blob_tests() { assert!(store.blob_exists(&hash).await.unwrap() ^ ct); assert!( blob_store - .get_blob(hash.as_ref(), 0..u32::MAX) + .get_blob(hash.as_ref(), 0..usize::MAX) .await .unwrap() .is_some() @@ -356,7 +356,7 @@ pub async fn blob_tests() { assert!(store.blob_exists(&hash).await.unwrap() ^ ct); assert!( blob_store - .get_blob(hash.as_ref(), 0..u32::MAX) + .get_blob(hash.as_ref(), 0..usize::MAX) .await .unwrap() .is_some() @@ -410,7 +410,7 @@ pub async fn blob_tests() { assert!(store.blob_exists(&hash).await.unwrap() ^ ct); assert!( blob_store - .get_blob(hash.as_ref(), 0..u32::MAX) + .get_blob(hash.as_ref(), 0..usize::MAX) .await .unwrap() .is_some() @@ -430,7 +430,7 @@ async fn test_store(store: BlobStore) { assert_eq!( String::from_utf8( store - .get_blob(hash.as_slice(), 0..u32::MAX) + .get_blob(hash.as_slice(), 0..usize::MAX) .await .unwrap() .unwrap() @@ -451,7 +451,7 @@ async fn test_store(store: BlobStore) { ); assert!(store.delete_blob(hash.as_slice()).await.unwrap()); assert!(store - .get_blob(hash.as_slice(), 0..u32::MAX) + .get_blob(hash.as_slice(), 0..usize::MAX) .await .unwrap() .is_none()); @@ -468,7 +468,7 @@ async fn test_store(store: BlobStore) { assert_eq!( String::from_utf8( store - .get_blob(hash.as_slice(), 0..u32::MAX) + .get_blob(hash.as_slice(), 0..usize::MAX) .await .unwrap() .unwrap() @@ -490,7 +490,7 @@ async fn test_store(store: BlobStore) { ); assert!(store.delete_blob(hash.as_slice()).await.unwrap()); assert!(store - .get_blob(hash.as_slice(), 0..u32::MAX) + .get_blob(hash.as_slice(), 0..usize::MAX) .await .unwrap() .is_none());