mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2024-11-28 09:07:32 +00:00
LZ4 compress blobs by default (closes #227)
This commit is contained in:
parent
62a4f70ac8
commit
f989f4f750
28 changed files with 256 additions and 144 deletions
3
Cargo.lock
generated
3
Cargo.lock
generated
|
@ -3134,9 +3134,6 @@ name = "lz4_flex"
|
|||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "912b45c753ff5f7f5208307e8ace7d2a2e30d024e26d3509f3dce546c044ce15"
|
||||
dependencies = [
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mail-auth"
|
||||
|
|
|
@ -318,7 +318,7 @@ impl<T: SessionStream> SessionData<T> {
|
|||
// Fetch and parse blob
|
||||
let raw_message = if needs_blobs {
|
||||
// Retrieve raw message if needed
|
||||
match self.jmap.get_blob(&email.blob_hash, 0..u32::MAX).await {
|
||||
match self.jmap.get_blob(&email.blob_hash, 0..usize::MAX).await {
|
||||
Ok(Some(raw_message)) => raw_message,
|
||||
Ok(None) => {
|
||||
tracing::warn!(event = "not-found",
|
||||
|
|
|
@ -50,7 +50,7 @@ rasn-cms = "0.10"
|
|||
rasn-pkix = "0.10"
|
||||
rsa = "0.9.2"
|
||||
async-trait = "0.1.68"
|
||||
lz4_flex = { version = "0.11" }
|
||||
lz4_flex = { version = "0.11", default-features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
ece = "2.2"
|
||||
|
|
|
@ -102,7 +102,7 @@ impl JMAP {
|
|||
if let Some(section) = &blob_id.section {
|
||||
self.get_blob_section(&blob_id.hash, section).await
|
||||
} else {
|
||||
self.get_blob(&blob_id.hash, 0..u32::MAX).await
|
||||
self.get_blob(&blob_id.hash, 0..usize::MAX).await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,8 +114,7 @@ impl JMAP {
|
|||
Ok(self
|
||||
.get_blob(
|
||||
hash,
|
||||
(section.offset_start as u32)
|
||||
..(section.offset_start.saturating_add(section.size) as u32),
|
||||
(section.offset_start)..(section.offset_start.saturating_add(section.size)),
|
||||
)
|
||||
.await?
|
||||
.and_then(|bytes| match Encoding::from(section.encoding) {
|
||||
|
@ -128,7 +127,7 @@ impl JMAP {
|
|||
pub async fn get_blob(
|
||||
&self,
|
||||
hash: &BlobHash,
|
||||
range: Range<u32>,
|
||||
range: Range<usize>,
|
||||
) -> Result<Option<Vec<u8>>, MethodError> {
|
||||
match self.blob_store.get_blob(hash.as_ref(), range).await {
|
||||
Ok(blob) => Ok(blob),
|
||||
|
|
|
@ -95,22 +95,19 @@ impl JMAP {
|
|||
continue 'outer;
|
||||
}
|
||||
|
||||
let offset = offset.unwrap_or(0) as u32;
|
||||
let offset = offset.unwrap_or(0);
|
||||
let length = length
|
||||
.map(|length| (length as u32).saturating_add(offset))
|
||||
.unwrap_or(u32::MAX);
|
||||
.map(|length| length.saturating_add(offset))
|
||||
.unwrap_or(usize::MAX);
|
||||
let bytes = if let Some(section) = &id.section {
|
||||
self.get_blob_section(&id.hash, section)
|
||||
.await?
|
||||
.map(|bytes| {
|
||||
if offset == 0 && length == u32::MAX {
|
||||
if offset == 0 && length == usize::MAX {
|
||||
bytes
|
||||
} else {
|
||||
bytes
|
||||
.get(
|
||||
offset as usize
|
||||
..std::cmp::min(length as usize, bytes.len()),
|
||||
)
|
||||
.get(offset..std::cmp::min(length, bytes.len()))
|
||||
.unwrap_or_default()
|
||||
.to_vec()
|
||||
}
|
||||
|
|
|
@ -169,7 +169,8 @@ impl JMAP {
|
|||
|
||||
// Retrieve raw message if needed
|
||||
let raw_message = if needs_body {
|
||||
if let Some(raw_message) = self.get_blob(&metadata.blob_hash, 0..u32::MAX).await? {
|
||||
if let Some(raw_message) = self.get_blob(&metadata.blob_hash, 0..usize::MAX).await?
|
||||
{
|
||||
raw_message
|
||||
} else {
|
||||
tracing::warn!(event = "not-found",
|
||||
|
|
|
@ -150,19 +150,20 @@ impl JMAP {
|
|||
snippet.preview = body.into();
|
||||
} else {*/
|
||||
// Download message
|
||||
let raw_message =
|
||||
if let Some(raw_message) = self.get_blob(&metadata.blob_hash, 0..u32::MAX).await? {
|
||||
raw_message
|
||||
} else {
|
||||
tracing::warn!(event = "not-found",
|
||||
let raw_message = if let Some(raw_message) =
|
||||
self.get_blob(&metadata.blob_hash, 0..usize::MAX).await?
|
||||
{
|
||||
raw_message
|
||||
} else {
|
||||
tracing::warn!(event = "not-found",
|
||||
account_id = account_id,
|
||||
collection = ?Collection::Email,
|
||||
document_id = email_id.document_id(),
|
||||
blob_id = ?metadata.blob_hash,
|
||||
"Blob not found");
|
||||
response.not_found.push(email_id);
|
||||
continue;
|
||||
};
|
||||
response.not_found.push(email_id);
|
||||
continue;
|
||||
};
|
||||
|
||||
// Find a matching part
|
||||
'outer: for part in &metadata.contents.parts {
|
||||
|
|
|
@ -95,8 +95,9 @@ impl JMAP {
|
|||
if metadata.inner.blob_hash.as_slice() == blob_hash.as_slice() =>
|
||||
{
|
||||
// Obtain raw message
|
||||
let raw_message = if let Ok(Some(raw_message)) =
|
||||
self.get_blob(&metadata.inner.blob_hash, 0..u32::MAX).await
|
||||
let raw_message = if let Ok(Some(raw_message)) = self
|
||||
.get_blob(&metadata.inner.blob_hash, 0..usize::MAX)
|
||||
.await
|
||||
{
|
||||
raw_message
|
||||
} else {
|
||||
|
|
|
@ -34,7 +34,7 @@ impl JMAP {
|
|||
// Read message
|
||||
let raw_message = match self
|
||||
.blob_store
|
||||
.get_blob(message.message_blob.as_slice(), 0..u32::MAX)
|
||||
.get_blob(message.message_blob.as_slice(), 0..usize::MAX)
|
||||
.await
|
||||
{
|
||||
Ok(Some(raw_message)) => raw_message,
|
||||
|
|
|
@ -226,7 +226,7 @@ impl JMAP {
|
|||
|
||||
// Obtain the sieve script blob
|
||||
let script_bytes = self
|
||||
.get_blob(&blob_id.hash, 0..u32::MAX)
|
||||
.get_blob(&blob_id.hash, 0..usize::MAX)
|
||||
.await?
|
||||
.ok_or(MethodError::ServerPartialFail)?;
|
||||
|
||||
|
|
|
@ -559,22 +559,22 @@ impl JMAP {
|
|||
);
|
||||
|
||||
// Obtain raw message
|
||||
let message = if let Some(message) = self.get_blob(&metadata.blob_hash, 0..u32::MAX).await?
|
||||
{
|
||||
if message.len() > self.config.mail_max_size {
|
||||
return Ok(Err(SetError::new(SetErrorType::InvalidEmail)
|
||||
.with_description(format!(
|
||||
"Message exceeds maximum size of {} bytes.",
|
||||
self.config.mail_max_size
|
||||
))));
|
||||
}
|
||||
let message =
|
||||
if let Some(message) = self.get_blob(&metadata.blob_hash, 0..usize::MAX).await? {
|
||||
if message.len() > self.config.mail_max_size {
|
||||
return Ok(Err(SetError::new(SetErrorType::InvalidEmail)
|
||||
.with_description(format!(
|
||||
"Message exceeds maximum size of {} bytes.",
|
||||
self.config.mail_max_size
|
||||
))));
|
||||
}
|
||||
|
||||
message
|
||||
} else {
|
||||
return Ok(Err(SetError::invalid_properties()
|
||||
.with_property(Property::EmailId)
|
||||
.with_description("Blob for email not found.")));
|
||||
};
|
||||
message
|
||||
} else {
|
||||
return Ok(Err(SetError::invalid_properties()
|
||||
.with_property(Property::EmailId)
|
||||
.with_description("Blob for email not found.")));
|
||||
};
|
||||
|
||||
// Begin local SMTP session
|
||||
let mut session =
|
||||
|
|
|
@ -537,7 +537,7 @@ pub async fn send_message<T: AsyncRead + AsyncWrite + Unpin>(
|
|||
.core
|
||||
.shared
|
||||
.default_blob_store
|
||||
.get_blob(message.blob_hash.as_slice(), 0..u32::MAX)
|
||||
.get_blob(message.blob_hash.as_slice(), 0..usize::MAX)
|
||||
.await
|
||||
{
|
||||
Ok(Some(raw_message)) => tokio::time::timeout(params.timeout_data, async {
|
||||
|
|
|
@ -27,7 +27,7 @@ lru-cache = { version = "0.1.2", optional = true }
|
|||
num_cpus = { version = "1.15.0", optional = true }
|
||||
blake3 = "1.3.3"
|
||||
tracing = "0.1"
|
||||
lz4_flex = { version = "0.11" }
|
||||
lz4_flex = { version = "0.11", default-features = false }
|
||||
deadpool-postgres = { version = "0.12.1", optional = true }
|
||||
tokio-postgres = { version = "0.7.10", optional = true }
|
||||
tokio-rustls = { version = "0.25.0", optional = true }
|
||||
|
|
|
@ -35,7 +35,7 @@ impl FdbStore {
|
|||
pub(crate) async fn get_blob(
|
||||
&self,
|
||||
key: &[u8],
|
||||
range: Range<u32>,
|
||||
range: Range<usize>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
let block_start = range.start as usize / MAX_VALUE_SIZE;
|
||||
let bytes_start = range.start as usize % MAX_VALUE_SIZE;
|
||||
|
|
|
@ -59,23 +59,22 @@ impl FsStore {
|
|||
pub(crate) async fn get_blob(
|
||||
&self,
|
||||
key: &[u8],
|
||||
range: Range<u32>,
|
||||
range: Range<usize>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
let blob_path = self.build_path(key);
|
||||
let blob_size = match fs::metadata(&blob_path).await {
|
||||
Ok(m) => m.len(),
|
||||
Ok(m) => m.len() as usize,
|
||||
Err(_) => return Ok(None),
|
||||
};
|
||||
let mut blob = File::open(&blob_path).await?;
|
||||
|
||||
Ok(Some(if range.start != 0 || range.end != u32::MAX {
|
||||
let from_offset = if range.start < blob_size as u32 {
|
||||
Ok(Some(if range.start != 0 || range.end != usize::MAX {
|
||||
let from_offset = if range.start < blob_size {
|
||||
range.start
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let mut buf =
|
||||
vec![0; (std::cmp::min(range.end, blob_size as u32) - from_offset) as usize];
|
||||
let mut buf = vec![0; (std::cmp::min(range.end, blob_size) - from_offset) as usize];
|
||||
|
||||
if from_offset > 0 {
|
||||
blob.seek(SeekFrom::Start(from_offset as u64)).await?;
|
||||
|
|
|
@ -31,22 +31,19 @@ impl MysqlStore {
|
|||
pub(crate) async fn get_blob(
|
||||
&self,
|
||||
key: &[u8],
|
||||
range: Range<u32>,
|
||||
range: Range<usize>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
let mut conn = self.conn_pool.get_conn().await?;
|
||||
let s = conn.prep("SELECT v FROM t WHERE k = ?").await?;
|
||||
conn.exec_first::<Vec<u8>, _, _>(&s, (key,))
|
||||
.await
|
||||
.map(|bytes| {
|
||||
if range.start == 0 && range.end == u32::MAX {
|
||||
if range.start == 0 && range.end == usize::MAX {
|
||||
bytes
|
||||
} else {
|
||||
bytes.map(|bytes| {
|
||||
bytes
|
||||
.get(
|
||||
range.start as usize
|
||||
..std::cmp::min(bytes.len(), range.end as usize),
|
||||
)
|
||||
.get(range.start..std::cmp::min(bytes.len(), range.end))
|
||||
.unwrap_or_default()
|
||||
.to_vec()
|
||||
})
|
||||
|
|
|
@ -29,7 +29,7 @@ impl PostgresStore {
|
|||
pub(crate) async fn get_blob(
|
||||
&self,
|
||||
key: &[u8],
|
||||
range: Range<u32>,
|
||||
range: Range<usize>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
let conn = self.conn_pool.get().await?;
|
||||
let s = conn.prepare_cached("SELECT v FROM t WHERE k = $1").await?;
|
||||
|
@ -37,15 +37,12 @@ impl PostgresStore {
|
|||
.await
|
||||
.and_then(|row| {
|
||||
if let Some(row) = row {
|
||||
Ok(Some(if range.start == 0 && range.end == u32::MAX {
|
||||
Ok(Some(if range.start == 0 && range.end == usize::MAX {
|
||||
row.try_get::<_, Vec<u8>>(0)?
|
||||
} else {
|
||||
let bytes = row.try_get::<_, &[u8]>(0)?;
|
||||
bytes
|
||||
.get(
|
||||
range.start as usize
|
||||
..std::cmp::min(bytes.len(), range.end as usize),
|
||||
)
|
||||
.get(range.start..std::cmp::min(bytes.len(), range.end))
|
||||
.unwrap_or_default()
|
||||
.to_vec()
|
||||
}))
|
||||
|
|
|
@ -29,21 +29,18 @@ impl RocksDbStore {
|
|||
pub(crate) async fn get_blob(
|
||||
&self,
|
||||
key: &[u8],
|
||||
range: Range<u32>,
|
||||
range: Range<usize>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
let db = self.db.clone();
|
||||
self.spawn_worker(move || {
|
||||
db.get_pinned_cf(&db.cf_handle(CF_BLOBS).unwrap(), key)
|
||||
.map(|obj| {
|
||||
obj.map(|bytes| {
|
||||
if range.start == 0 && range.end == u32::MAX {
|
||||
if range.start == 0 && range.end == usize::MAX {
|
||||
bytes.to_vec()
|
||||
} else {
|
||||
bytes
|
||||
.get(
|
||||
range.start as usize
|
||||
..std::cmp::min(bytes.len(), range.end as usize),
|
||||
)
|
||||
.get(range.start..std::cmp::min(bytes.len(), range.end))
|
||||
.unwrap_or_default()
|
||||
.to_vec()
|
||||
}
|
||||
|
|
|
@ -75,10 +75,10 @@ impl S3Store {
|
|||
pub(crate) async fn get_blob(
|
||||
&self,
|
||||
key: &[u8],
|
||||
range: Range<u32>,
|
||||
range: Range<usize>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
let path = self.build_key(key);
|
||||
let response = if range.start != 0 || range.end != u32::MAX {
|
||||
let response = if range.start != 0 || range.end != usize::MAX {
|
||||
self.bucket
|
||||
.get_object_range(
|
||||
path,
|
||||
|
|
|
@ -31,7 +31,7 @@ impl SqliteStore {
|
|||
pub(crate) async fn get_blob(
|
||||
&self,
|
||||
key: &[u8],
|
||||
range: Range<u32>,
|
||||
range: Range<usize>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
let conn = self.conn_pool.get()?;
|
||||
self.spawn_worker(move || {
|
||||
|
@ -40,14 +40,11 @@ impl SqliteStore {
|
|||
.query_row([&key], |row| {
|
||||
Ok({
|
||||
let bytes = row.get_ref(0)?.as_bytes()?;
|
||||
if range.start == 0 && range.end == u32::MAX {
|
||||
if range.start == 0 && range.end == usize::MAX {
|
||||
bytes.to_vec()
|
||||
} else {
|
||||
bytes
|
||||
.get(
|
||||
range.start as usize
|
||||
..std::cmp::min(bytes.len(), range.end as usize),
|
||||
)
|
||||
.get(range.start..std::cmp::min(bytes.len(), range.end))
|
||||
.unwrap_or_default()
|
||||
.to_vec()
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ use utils::config::{cron::SimpleCron, Config};
|
|||
use crate::{
|
||||
backend::{fs::FsStore, memory::MemoryStore},
|
||||
write::purge::{PurgeSchedule, PurgeStore},
|
||||
LookupStore, QueryStore, Store, Stores,
|
||||
BlobStore, CompressionAlgo, LookupStore, QueryStore, Store, Stores,
|
||||
};
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
|
@ -83,6 +83,8 @@ impl ConfigStore for Config {
|
|||
.to_ascii_lowercase();
|
||||
let prefix = ("store", id);
|
||||
let store_id = id.to_string();
|
||||
let compression_algo =
|
||||
self.property_or_static::<CompressionAlgo>(("store", id, "compression"), "lz4")?;
|
||||
|
||||
let lookup_store: Store = match protocol.as_str() {
|
||||
#[cfg(feature = "rocks")]
|
||||
|
@ -92,9 +94,10 @@ impl ConfigStore for Config {
|
|||
config
|
||||
.fts_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config
|
||||
.blob_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config.blob_stores.insert(
|
||||
store_id.clone(),
|
||||
BlobStore::from(db.clone()).with_compression(compression_algo),
|
||||
);
|
||||
config.lookup_stores.insert(store_id, db.into());
|
||||
continue;
|
||||
}
|
||||
|
@ -105,9 +108,10 @@ impl ConfigStore for Config {
|
|||
config
|
||||
.fts_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config
|
||||
.blob_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config.blob_stores.insert(
|
||||
store_id.clone(),
|
||||
BlobStore::from(db.clone()).with_compression(compression_algo),
|
||||
);
|
||||
config.lookup_stores.insert(store_id, db.into());
|
||||
continue;
|
||||
}
|
||||
|
@ -118,9 +122,10 @@ impl ConfigStore for Config {
|
|||
config
|
||||
.fts_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config
|
||||
.blob_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config.blob_stores.insert(
|
||||
store_id.clone(),
|
||||
BlobStore::from(db.clone()).with_compression(compression_algo),
|
||||
);
|
||||
db
|
||||
}
|
||||
#[cfg(feature = "mysql")]
|
||||
|
@ -130,9 +135,10 @@ impl ConfigStore for Config {
|
|||
config
|
||||
.fts_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config
|
||||
.blob_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config.blob_stores.insert(
|
||||
store_id.clone(),
|
||||
BlobStore::from(db.clone()).with_compression(compression_algo),
|
||||
);
|
||||
db
|
||||
}
|
||||
#[cfg(feature = "sqlite")]
|
||||
|
@ -142,22 +148,27 @@ impl ConfigStore for Config {
|
|||
config
|
||||
.fts_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config
|
||||
.blob_stores
|
||||
.insert(store_id.clone(), db.clone().into());
|
||||
config.blob_stores.insert(
|
||||
store_id.clone(),
|
||||
BlobStore::from(db.clone()).with_compression(compression_algo),
|
||||
);
|
||||
db
|
||||
}
|
||||
"fs" => {
|
||||
config
|
||||
.blob_stores
|
||||
.insert(store_id, FsStore::open(self, prefix).await?.into());
|
||||
config.blob_stores.insert(
|
||||
store_id,
|
||||
BlobStore::from(FsStore::open(self, prefix).await?)
|
||||
.with_compression(compression_algo),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
#[cfg(feature = "s3")]
|
||||
"s3" => {
|
||||
config
|
||||
.blob_stores
|
||||
.insert(store_id, S3Store::open(self, prefix).await?.into());
|
||||
config.blob_stores.insert(
|
||||
store_id,
|
||||
BlobStore::from(S3Store::open(self, prefix).await?)
|
||||
.with_compression(compression_algo),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
#[cfg(feature = "elastic")]
|
||||
|
|
|
@ -21,54 +21,110 @@
|
|||
* for more details.
|
||||
*/
|
||||
|
||||
use std::ops::Range;
|
||||
use std::{borrow::Cow, ops::Range};
|
||||
|
||||
use crate::{BlobStore, Store};
|
||||
use utils::config::utils::ParseValue;
|
||||
|
||||
use crate::{BlobBackend, BlobStore, CompressionAlgo, Store};
|
||||
|
||||
impl BlobStore {
|
||||
pub async fn get_blob(&self, key: &[u8], range: Range<u32>) -> crate::Result<Option<Vec<u8>>> {
|
||||
match self {
|
||||
Self::Store(store) => match store {
|
||||
pub async fn get_blob(
|
||||
&self,
|
||||
key: &[u8],
|
||||
range: Range<usize>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
let read_range = match self.compression {
|
||||
CompressionAlgo::None => range.clone(),
|
||||
CompressionAlgo::Lz4 => 0..usize::MAX,
|
||||
};
|
||||
|
||||
let result = match &self.backend {
|
||||
BlobBackend::Store(store) => match store {
|
||||
#[cfg(feature = "sqlite")]
|
||||
Store::SQLite(store) => store.get_blob(key, range).await,
|
||||
Store::SQLite(store) => store.get_blob(key, read_range).await,
|
||||
#[cfg(feature = "foundation")]
|
||||
Store::FoundationDb(store) => store.get_blob(key, range).await,
|
||||
Store::FoundationDb(store) => store.get_blob(key, read_range).await,
|
||||
#[cfg(feature = "postgres")]
|
||||
Store::PostgreSQL(store) => store.get_blob(key, range).await,
|
||||
Store::PostgreSQL(store) => store.get_blob(key, read_range).await,
|
||||
#[cfg(feature = "mysql")]
|
||||
Store::MySQL(store) => store.get_blob(key, range).await,
|
||||
Store::MySQL(store) => store.get_blob(key, read_range).await,
|
||||
#[cfg(feature = "rocks")]
|
||||
Store::RocksDb(store) => store.get_blob(key, range).await,
|
||||
Store::RocksDb(store) => store.get_blob(key, read_range).await,
|
||||
},
|
||||
Self::Fs(store) => store.get_blob(key, range).await,
|
||||
BlobBackend::Fs(store) => store.get_blob(key, read_range).await,
|
||||
#[cfg(feature = "s3")]
|
||||
Self::S3(store) => store.get_blob(key, range).await,
|
||||
BlobBackend::S3(store) => store.get_blob(key, read_range).await,
|
||||
};
|
||||
|
||||
let decompressed = match self.compression {
|
||||
CompressionAlgo::Lz4 => match result? {
|
||||
Some(data)
|
||||
if data.last().copied().unwrap_or_default()
|
||||
== CompressionAlgo::Lz4.marker() =>
|
||||
{
|
||||
lz4_flex::decompress_size_prepended(
|
||||
data.get(..data.len() - 1).unwrap_or_default(),
|
||||
)
|
||||
.map_err(|err| {
|
||||
crate::Error::InternalError(format!(
|
||||
"Failed to decompress LZ4 data: {}",
|
||||
err
|
||||
))
|
||||
})?
|
||||
}
|
||||
Some(data) => {
|
||||
tracing::debug!("Warning: Missing LZ4 marker for key: {key:?}");
|
||||
data
|
||||
}
|
||||
None => return Ok(None),
|
||||
},
|
||||
_ => return result,
|
||||
};
|
||||
|
||||
if range.end >= decompressed.len() {
|
||||
Ok(Some(decompressed))
|
||||
} else {
|
||||
Ok(Some(
|
||||
decompressed
|
||||
.get(range.start..range.end)
|
||||
.unwrap_or_default()
|
||||
.to_vec(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> {
|
||||
match self {
|
||||
Self::Store(store) => match store {
|
||||
let data: Cow<[u8]> = match self.compression {
|
||||
CompressionAlgo::None => data.into(),
|
||||
CompressionAlgo::Lz4 => {
|
||||
let mut compressed = lz4_flex::compress_prepend_size(data);
|
||||
compressed.push(CompressionAlgo::Lz4.marker());
|
||||
compressed.into()
|
||||
}
|
||||
};
|
||||
|
||||
match &self.backend {
|
||||
BlobBackend::Store(store) => match store {
|
||||
#[cfg(feature = "sqlite")]
|
||||
Store::SQLite(store) => store.put_blob(key, data).await,
|
||||
Store::SQLite(store) => store.put_blob(key, data.as_ref()).await,
|
||||
#[cfg(feature = "foundation")]
|
||||
Store::FoundationDb(store) => store.put_blob(key, data).await,
|
||||
Store::FoundationDb(store) => store.put_blob(key, data.as_ref()).await,
|
||||
#[cfg(feature = "postgres")]
|
||||
Store::PostgreSQL(store) => store.put_blob(key, data).await,
|
||||
Store::PostgreSQL(store) => store.put_blob(key, data.as_ref()).await,
|
||||
#[cfg(feature = "mysql")]
|
||||
Store::MySQL(store) => store.put_blob(key, data).await,
|
||||
Store::MySQL(store) => store.put_blob(key, data.as_ref()).await,
|
||||
#[cfg(feature = "rocks")]
|
||||
Store::RocksDb(store) => store.put_blob(key, data).await,
|
||||
Store::RocksDb(store) => store.put_blob(key, data.as_ref()).await,
|
||||
},
|
||||
Self::Fs(store) => store.put_blob(key, data).await,
|
||||
BlobBackend::Fs(store) => store.put_blob(key, data.as_ref()).await,
|
||||
#[cfg(feature = "s3")]
|
||||
Self::S3(store) => store.put_blob(key, data).await,
|
||||
BlobBackend::S3(store) => store.put_blob(key, data.as_ref()).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_blob(&self, key: &[u8]) -> crate::Result<bool> {
|
||||
match self {
|
||||
Self::Store(store) => match store {
|
||||
match &self.backend {
|
||||
BlobBackend::Store(store) => match store {
|
||||
#[cfg(feature = "sqlite")]
|
||||
Store::SQLite(store) => store.delete_blob(key).await,
|
||||
#[cfg(feature = "foundation")]
|
||||
|
@ -80,9 +136,46 @@ impl BlobStore {
|
|||
#[cfg(feature = "rocks")]
|
||||
Store::RocksDb(store) => store.delete_blob(key).await,
|
||||
},
|
||||
Self::Fs(store) => store.delete_blob(key).await,
|
||||
BlobBackend::Fs(store) => store.delete_blob(key).await,
|
||||
#[cfg(feature = "s3")]
|
||||
Self::S3(store) => store.delete_blob(key).await,
|
||||
BlobBackend::S3(store) => store.delete_blob(key).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_compression(self, compression: CompressionAlgo) -> Self {
|
||||
Self {
|
||||
backend: self.backend,
|
||||
compression,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const MAGIC_MARKER: u8 = 0xa0;
|
||||
|
||||
impl CompressionAlgo {
|
||||
pub fn marker(&self) -> u8 {
|
||||
match self {
|
||||
CompressionAlgo::Lz4 => MAGIC_MARKER | 0x01,
|
||||
//CompressionAlgo::Zstd => MAGIC_MARKER | 0x02,
|
||||
CompressionAlgo::None => 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ParseValue for CompressionAlgo {
|
||||
fn parse_value(
|
||||
key: impl utils::config::utils::AsKey,
|
||||
value: &str,
|
||||
) -> utils::config::Result<Self> {
|
||||
match value {
|
||||
"lz4" => Ok(CompressionAlgo::Lz4),
|
||||
//"zstd" => Ok(CompressionAlgo::Zstd),
|
||||
"none" | "false" | "disable" | "disabled" => Ok(CompressionAlgo::None),
|
||||
algo => Err(format!(
|
||||
"Invalid compression algorithm: {} for key {}",
|
||||
algo,
|
||||
key.as_key()
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -339,7 +339,11 @@ impl Store {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_blob(&self, key: &[u8], range: Range<u32>) -> crate::Result<Option<Vec<u8>>> {
|
||||
pub async fn get_blob(
|
||||
&self,
|
||||
key: &[u8],
|
||||
range: Range<usize>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
match self {
|
||||
#[cfg(feature = "sqlite")]
|
||||
Self::SQLite(store) => store.get_blob(key, range).await,
|
||||
|
|
|
@ -207,7 +207,19 @@ pub enum Store {
|
|||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum BlobStore {
|
||||
pub struct BlobStore {
|
||||
pub backend: BlobBackend,
|
||||
pub compression: CompressionAlgo,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum CompressionAlgo {
|
||||
None,
|
||||
Lz4,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum BlobBackend {
|
||||
Store(Store),
|
||||
Fs(Arc<FsStore>),
|
||||
#[cfg(feature = "s3")]
|
||||
|
@ -272,14 +284,20 @@ impl From<RocksDbStore> for Store {
|
|||
|
||||
impl From<FsStore> for BlobStore {
|
||||
fn from(store: FsStore) -> Self {
|
||||
Self::Fs(Arc::new(store))
|
||||
BlobStore {
|
||||
backend: BlobBackend::Fs(Arc::new(store)),
|
||||
compression: CompressionAlgo::None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
impl From<S3Store> for BlobStore {
|
||||
fn from(store: S3Store) -> Self {
|
||||
Self::S3(Arc::new(store))
|
||||
BlobStore {
|
||||
backend: BlobBackend::S3(Arc::new(store)),
|
||||
compression: CompressionAlgo::None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -305,7 +323,10 @@ impl From<Store> for FtsStore {
|
|||
|
||||
impl From<Store> for BlobStore {
|
||||
fn from(store: Store) -> Self {
|
||||
Self::Store(store)
|
||||
BlobStore {
|
||||
backend: BlobBackend::Store(store),
|
||||
compression: CompressionAlgo::None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -362,7 +362,7 @@ impl TestMessage for Message {
|
|||
async fn read_message(&self, core: &QueueReceiver) -> String {
|
||||
String::from_utf8(
|
||||
core.blob_store
|
||||
.get_blob(self.blob_hash.as_slice(), 0..u32::MAX)
|
||||
.get_blob(self.blob_hash.as_slice(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("Message blob not found"),
|
||||
|
|
|
@ -206,7 +206,7 @@ impl TestConfig for SMTP {
|
|||
blocked_ips: Arc::new(BlockedIps::new(store.clone().into())),
|
||||
}),
|
||||
default_lookup_store: LookupStore::Store(store.clone()),
|
||||
default_blob_store: store::BlobStore::Store(store.clone()),
|
||||
default_blob_store: store.clone().into(),
|
||||
default_data_store: store,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -174,7 +174,7 @@ impl QueueReceiver {
|
|||
|
||||
let bytes = self
|
||||
.blob_store
|
||||
.get_blob(message.blob_hash.as_slice(), 0..u32::MAX)
|
||||
.get_blob(message.blob_hash.as_slice(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
|
|
@ -93,7 +93,7 @@ pub async fn blob_tests() {
|
|||
// Blob hash should now exist
|
||||
assert!(store.blob_exists(&hash).await.unwrap());
|
||||
assert!(blob_store
|
||||
.get_blob(hash.as_ref(), 0..u32::MAX)
|
||||
.get_blob(hash.as_ref(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some());
|
||||
|
@ -149,7 +149,7 @@ pub async fn blob_tests() {
|
|||
|
||||
// Blob should no longer be in store
|
||||
assert!(blob_store
|
||||
.get_blob(hash.as_ref(), 0..u32::MAX)
|
||||
.get_blob(hash.as_ref(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
@ -269,7 +269,7 @@ pub async fn blob_tests() {
|
|||
assert!(store.blob_exists(&hash).await.unwrap() ^ ct);
|
||||
assert!(
|
||||
blob_store
|
||||
.get_blob(hash.as_ref(), 0..u32::MAX)
|
||||
.get_blob(hash.as_ref(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some()
|
||||
|
@ -356,7 +356,7 @@ pub async fn blob_tests() {
|
|||
assert!(store.blob_exists(&hash).await.unwrap() ^ ct);
|
||||
assert!(
|
||||
blob_store
|
||||
.get_blob(hash.as_ref(), 0..u32::MAX)
|
||||
.get_blob(hash.as_ref(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some()
|
||||
|
@ -410,7 +410,7 @@ pub async fn blob_tests() {
|
|||
assert!(store.blob_exists(&hash).await.unwrap() ^ ct);
|
||||
assert!(
|
||||
blob_store
|
||||
.get_blob(hash.as_ref(), 0..u32::MAX)
|
||||
.get_blob(hash.as_ref(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some()
|
||||
|
@ -430,7 +430,7 @@ async fn test_store(store: BlobStore) {
|
|||
assert_eq!(
|
||||
String::from_utf8(
|
||||
store
|
||||
.get_blob(hash.as_slice(), 0..u32::MAX)
|
||||
.get_blob(hash.as_slice(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
|
@ -451,7 +451,7 @@ async fn test_store(store: BlobStore) {
|
|||
);
|
||||
assert!(store.delete_blob(hash.as_slice()).await.unwrap());
|
||||
assert!(store
|
||||
.get_blob(hash.as_slice(), 0..u32::MAX)
|
||||
.get_blob(hash.as_slice(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
@ -468,7 +468,7 @@ async fn test_store(store: BlobStore) {
|
|||
assert_eq!(
|
||||
String::from_utf8(
|
||||
store
|
||||
.get_blob(hash.as_slice(), 0..u32::MAX)
|
||||
.get_blob(hash.as_slice(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
|
@ -490,7 +490,7 @@ async fn test_store(store: BlobStore) {
|
|||
);
|
||||
assert!(store.delete_blob(hash.as_slice()).await.unwrap());
|
||||
assert!(store
|
||||
.get_blob(hash.as_slice(), 0..u32::MAX)
|
||||
.get_blob(hash.as_slice(), 0..usize::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
|
Loading…
Reference in a new issue