mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2024-11-24 15:19:08 +00:00
FoundationDB passing tests.
This commit is contained in:
parent
acb81eee18
commit
40376cf2b4
21 changed files with 491 additions and 126 deletions
31
README.md
31
README.md
|
@ -1,2 +1,29 @@
|
|||
# store
|
||||
Distributed Message Store
|
||||
# Stalwart Mail Server
|
||||
|
||||
[![Test](https://github.com/stalwartlabs/mail-server/actions/workflows/test.yml/badge.svg)](https://github.com/stalwartlabs/mail-server/actions/workflows/test.yml)
|
||||
[![Build](https://github.com/stalwartlabs/mail-server/actions/workflows/build.yml/badge.svg)](https://github.com/stalwartlabs/mail-server/actions/workflows/build.yml)
|
||||
[![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0)
|
||||
[![](https://img.shields.io/discord/923615863037390889?label=Chat)](https://discord.gg/jtgtCNj66U)
|
||||
[![](https://img.shields.io/twitter/follow/stalwartlabs?style=flat)](https://twitter.com/stalwartlabs)
|
||||
|
||||
**Stalwart Mail Server** is an open-source mail server solution with JMAP, IMAP4, and SMTP support and a wide range of modern features. It is written in Rust and designed to be secure, fast, robust and scalable.
|
||||
|
||||
_Caveat emptor_: This repository is currently under active development and is not yet ready for production use. Please refer to the JMAP, IMAP and SMTP repositories if you would like to
|
||||
try each of these servers individually:
|
||||
|
||||
* [Stalwart JMAP Server](https://github.com/stalwartlabs/jmap-server/)
|
||||
* [Stalwart IMAP Server](https://github.com/stalwartlabs/imap-server/)
|
||||
* [Stalwart SMTP Server](https://github.com/stalwartlabs/smtp-server/)
|
||||
|
||||
## License
|
||||
|
||||
Licensed under the terms of the [GNU Affero General Public License](https://www.gnu.org/licenses/agpl-3.0.en.html) as published by
|
||||
the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
See [LICENSE](LICENSE) for more details.
|
||||
|
||||
You can be released from the requirements of the AGPLv3 license by purchasing
|
||||
a commercial license. Please contact licensing@stalw.art for more details.
|
||||
|
||||
## Copyright
|
||||
|
||||
Copyright (C) 2020, Stalwart Labs Ltd.
|
||||
|
|
|
@ -296,6 +296,10 @@ impl JMAP {
|
|||
MethodError::ServerPartialFail
|
||||
})?;
|
||||
|
||||
// Prepare batch
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch.with_account_id(account_id);
|
||||
|
||||
// Build change log
|
||||
let mut changes = self.begin_changes(account_id).await?;
|
||||
let thread_id = if let Some(thread_id) = thread_id {
|
||||
|
@ -305,6 +309,9 @@ impl JMAP {
|
|||
let thread_id = self
|
||||
.assign_document_id(account_id, Collection::Thread)
|
||||
.await?;
|
||||
batch
|
||||
.with_collection(Collection::Thread)
|
||||
.create_document(thread_id);
|
||||
changes.log_insert(Collection::Thread, thread_id);
|
||||
thread_id
|
||||
};
|
||||
|
@ -316,9 +323,7 @@ impl JMAP {
|
|||
}
|
||||
|
||||
// Build batch
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Email)
|
||||
.create_document(message_id)
|
||||
.value(Property::ThreadId, thread_id, F_VALUE | F_BITMAP)
|
||||
|
|
|
@ -7,6 +7,7 @@ use jmap_proto::{
|
|||
types::{
|
||||
acl::Acl,
|
||||
collection::Collection,
|
||||
id::Id,
|
||||
property::Property,
|
||||
state::{State, StateChange},
|
||||
type_state::TypeState,
|
||||
|
@ -69,14 +70,18 @@ impl JMAP {
|
|||
id,
|
||||
SetError::invalid_properties()
|
||||
.with_property(Property::MailboxIds)
|
||||
.with_description(format!("Mailbox {} does not exist.", mailbox_id)),
|
||||
.with_description(format!(
|
||||
"Mailbox {} does not exist.",
|
||||
Id::from(*mailbox_id)
|
||||
)),
|
||||
);
|
||||
continue 'outer;
|
||||
} else if matches!(&can_add_mailbox_ids, Some(ids) if !ids.contains(*mailbox_id)) {
|
||||
response.not_created.append(
|
||||
id,
|
||||
SetError::forbidden().with_description(format!(
|
||||
"You are not allowed to add messages to mailbox {mailbox_id}."
|
||||
"You are not allowed to add messages to mailbox {}.",
|
||||
Id::from(*mailbox_id)
|
||||
)),
|
||||
);
|
||||
continue 'outer;
|
||||
|
|
|
@ -174,6 +174,10 @@ impl JMAP {
|
|||
IngestError::Temporary
|
||||
})?;
|
||||
|
||||
// Prepare batch
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch.with_account_id(account_id);
|
||||
|
||||
// Build change log
|
||||
let mut changes = ChangeLogBuilder::with_change_id(change_id);
|
||||
let thread_id = if let Some(thread_id) = thread_id {
|
||||
|
@ -192,6 +196,9 @@ impl JMAP {
|
|||
"Failed to assign documentId for new thread.");
|
||||
IngestError::Temporary
|
||||
})?;
|
||||
batch
|
||||
.with_collection(Collection::Thread)
|
||||
.create_document(thread_id);
|
||||
changes.log_insert(Collection::Thread, thread_id);
|
||||
thread_id
|
||||
};
|
||||
|
@ -202,9 +209,7 @@ impl JMAP {
|
|||
}
|
||||
|
||||
// Build write batch
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Email)
|
||||
.create_document(document_id)
|
||||
.index_message(
|
||||
|
|
|
@ -160,7 +160,7 @@ impl JMAP {
|
|||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Mailbox)
|
||||
.create_document(document_id)
|
||||
.update_document(document_id)
|
||||
.custom(builder);
|
||||
if !batch.is_empty() {
|
||||
changes.log_update(Collection::Mailbox, document_id);
|
||||
|
|
|
@ -27,7 +27,7 @@ jieba-rs = "0.6" # Chinese stemmer
|
|||
xxhash-rust = { version = "0.8.5", features = ["xxh3"] }
|
||||
farmhash = "1.1.5"
|
||||
siphasher = "0.3"
|
||||
parking_lot = { version = "0.12.1", optional = true }
|
||||
parking_lot = "0.12.1"
|
||||
lru-cache = { version = "0.1.2", optional = true }
|
||||
num_cpus = { version = "1.15.0", optional = true }
|
||||
blake3 = "1.3.3"
|
||||
|
@ -35,11 +35,11 @@ tracing = "0.1"
|
|||
|
||||
|
||||
[features]
|
||||
default = ["sqlite"]
|
||||
default = ["foundation"]
|
||||
rocks = ["rocksdb", "rayon", "is_sync"]
|
||||
sqlite = ["rusqlite", "rayon", "r2d2", "num_cpus", "is_sync"]
|
||||
foundation = ["foundationdb", "futures", "is_async", "key_subspace"]
|
||||
is_sync = ["maybe-async/is_sync", "parking_lot", "lru-cache"]
|
||||
is_sync = ["maybe-async/is_sync", "lru-cache"]
|
||||
is_async = []
|
||||
key_subspace = []
|
||||
test_mode = []
|
||||
|
|
|
@ -8,56 +8,34 @@ pub const BITS_PER_BLOCK: u32 = WORD_SIZE_BITS * WORDS_PER_BLOCK;
|
|||
const BITS_MASK: u32 = BITS_PER_BLOCK - 1;
|
||||
|
||||
pub struct DenseBitmap {
|
||||
restore_value: u8,
|
||||
restore_pos: usize,
|
||||
pub block_num: u32,
|
||||
pub bitmap: [u8; WORD_SIZE * WORDS_PER_BLOCK as usize],
|
||||
}
|
||||
|
||||
impl DenseBitmap {
|
||||
pub fn empty() -> Self {
|
||||
Self {
|
||||
block_num: 0,
|
||||
restore_pos: 0,
|
||||
restore_value: 0,
|
||||
bitmap: [0; WORD_SIZE * WORDS_PER_BLOCK as usize],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn full() -> Self {
|
||||
Self {
|
||||
block_num: 0,
|
||||
restore_pos: 0,
|
||||
restore_value: u8::MAX,
|
||||
bitmap: [u8::MAX; WORD_SIZE * WORDS_PER_BLOCK as usize],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(&mut self, index: u32) {
|
||||
self.block_num = index / BITS_PER_BLOCK;
|
||||
let index = index & BITS_MASK;
|
||||
self.restore_pos = (index / 8) as usize;
|
||||
self.bitmap[self.restore_pos] = 1 << (index & 7);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn set_or(&mut self, index: u32) {
|
||||
let _index = index;
|
||||
self.block_num = index / BITS_PER_BLOCK;
|
||||
let index = index & BITS_MASK;
|
||||
self.restore_pos = (index / 8) as usize;
|
||||
self.bitmap[self.restore_pos] |= 1 << (index & 7);
|
||||
self.bitmap[(index / 8) as usize] |= 1 << (index & 7);
|
||||
}
|
||||
|
||||
pub fn clear(&mut self, index: u32) {
|
||||
self.block_num = index / BITS_PER_BLOCK;
|
||||
let index = BITS_MASK - (index & BITS_MASK);
|
||||
self.restore_pos = (index / 8) as usize;
|
||||
self.bitmap[self.restore_pos] = !(1 << (index & 7));
|
||||
let index = index & BITS_MASK;
|
||||
self.bitmap[(index / 8) as usize] &= !(1 << (index & 7));
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.bitmap[self.restore_pos] = self.restore_value;
|
||||
pub fn block_num(index: u32) -> u32 {
|
||||
index / BITS_PER_BLOCK
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,7 +134,7 @@ mod tests {
|
|||
blocks
|
||||
.entry(item / BITS_PER_BLOCK)
|
||||
.or_insert_with(DenseBitmap::empty)
|
||||
.set_or(item);
|
||||
.set(item);
|
||||
}
|
||||
let mut bitmap_blocks = RoaringBitmap::new();
|
||||
for (block_num, dense_bitmap) in blocks {
|
||||
|
@ -184,7 +162,7 @@ mod tests {
|
|||
if id < 1023 { Some(id + 1) } else { None },
|
||||
"reserved id failed for {id}"
|
||||
);
|
||||
bm.set_or(id);
|
||||
bm.set(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::Error;
|
|||
|
||||
pub mod bitmap;
|
||||
pub mod main;
|
||||
pub mod purge;
|
||||
pub mod read;
|
||||
pub mod write;
|
||||
|
||||
|
|
98
crates/store/src/backend/foundationdb/purge.rs
Normal file
98
crates/store/src/backend/foundationdb/purge.rs
Normal file
|
@ -0,0 +1,98 @@
|
|||
use foundationdb::{
|
||||
options::{self, MutationType},
|
||||
FdbError, KeySelector, RangeOption,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
|
||||
use crate::{
|
||||
write::key::KeySerializer, Store, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS,
|
||||
SUBSPACE_VALUES,
|
||||
};
|
||||
|
||||
use super::bitmap::DenseBitmap;
|
||||
|
||||
const MAX_COMMIT_ATTEMPTS: u8 = 25;
|
||||
|
||||
impl Store {
|
||||
pub async fn purge_bitmaps(&self) -> crate::Result<()> {
|
||||
// Obtain all empty bitmaps
|
||||
let trx = self.db.create_trx()?;
|
||||
let mut iter = trx.get_ranges(
|
||||
RangeOption {
|
||||
begin: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, 0u8][..]),
|
||||
end: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, u8::MAX][..]),
|
||||
mode: options::StreamingMode::WantAll,
|
||||
reverse: false,
|
||||
..Default::default()
|
||||
},
|
||||
true,
|
||||
);
|
||||
let mut delete_keys = Vec::new();
|
||||
|
||||
while let Some(values) = iter.next().await {
|
||||
for value in values? {
|
||||
if value.value().iter().all(|byte| *byte == 0) {
|
||||
delete_keys.push(value.key().to_vec());
|
||||
}
|
||||
}
|
||||
}
|
||||
if delete_keys.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Delete keys
|
||||
let bitmap = DenseBitmap::empty();
|
||||
for chunk in delete_keys.chunks(1024) {
|
||||
let mut retry_count = 0;
|
||||
loop {
|
||||
let trx = self.db.create_trx()?;
|
||||
for key in chunk {
|
||||
trx.atomic_op(key, &bitmap.bitmap, MutationType::CompareAndClear);
|
||||
}
|
||||
match trx.commit().await {
|
||||
Ok(_) => {
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
if retry_count < MAX_COMMIT_ATTEMPTS {
|
||||
err.on_error().await?;
|
||||
retry_count += 1;
|
||||
} else {
|
||||
return Err(FdbError::from(err).into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
|
||||
for subspace in [
|
||||
SUBSPACE_BITMAPS,
|
||||
SUBSPACE_VALUES,
|
||||
SUBSPACE_LOGS,
|
||||
SUBSPACE_INDEXES,
|
||||
] {
|
||||
let from_key = KeySerializer::new(std::mem::size_of::<u32>() + 2)
|
||||
.write(subspace)
|
||||
.write(account_id)
|
||||
.write(0u8)
|
||||
.finalize();
|
||||
let to_key = KeySerializer::new(std::mem::size_of::<u32>() + 2)
|
||||
.write(subspace)
|
||||
.write(account_id)
|
||||
.write(u8::MAX)
|
||||
.finalize();
|
||||
|
||||
let trx = self.db.create_trx()?;
|
||||
trx.clear_range(&from_key, &to_key);
|
||||
if let Err(err) = trx.commit().await {
|
||||
return Err(FdbError::from(err).into());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -13,15 +13,15 @@ use roaring::RoaringBitmap;
|
|||
use crate::{
|
||||
query::Operator,
|
||||
write::key::{DeserializeBigEndian, KeySerializer},
|
||||
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, ReadTransaction, Serialize, Store,
|
||||
ValueKey, SUBSPACE_INDEXES,
|
||||
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, ReadTransaction, Serialize,
|
||||
Store, SUBSPACE_INDEXES,
|
||||
};
|
||||
|
||||
use super::bitmap::DeserializeBlock;
|
||||
|
||||
impl ReadTransaction<'_> {
|
||||
#[inline(always)]
|
||||
pub async fn get_value<U>(&self, key: ValueKey) -> crate::Result<Option<U>>
|
||||
pub async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>>
|
||||
where
|
||||
U: Deserialize,
|
||||
{
|
||||
|
@ -39,7 +39,7 @@ impl ReadTransaction<'_> {
|
|||
mut key: BitmapKey<T>,
|
||||
bm: &mut RoaringBitmap,
|
||||
) -> crate::Result<()> {
|
||||
let begin = key.serialize();
|
||||
let begin = (&key).serialize();
|
||||
key.block_num = u32::MAX;
|
||||
let end = key.serialize();
|
||||
let key_len = begin.len();
|
||||
|
@ -162,6 +162,7 @@ impl ReadTransaction<'_> {
|
|||
),
|
||||
),
|
||||
};
|
||||
let key_len = begin.key().len();
|
||||
|
||||
let opt = RangeOption {
|
||||
begin,
|
||||
|
@ -182,7 +183,6 @@ impl ReadTransaction<'_> {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
let key_len = begin.len();
|
||||
while let Some(values) = range_stream.next().await {
|
||||
for value in values? {
|
||||
let key = value.key();
|
||||
|
@ -256,7 +256,36 @@ impl ReadTransaction<'_> {
|
|||
ascending: bool,
|
||||
cb: impl Fn(&mut T, &[u8], &[u8]) -> crate::Result<bool> + Sync + Send + 'static,
|
||||
) -> crate::Result<T> {
|
||||
todo!()
|
||||
let begin = begin.serialize();
|
||||
let end = end.serialize();
|
||||
|
||||
let mut iter = self.trx.get_ranges(
|
||||
RangeOption {
|
||||
begin: KeySelector::first_greater_or_equal(&begin),
|
||||
end: KeySelector::first_greater_than(&end),
|
||||
mode: if first {
|
||||
options::StreamingMode::Small
|
||||
} else {
|
||||
options::StreamingMode::Iterator
|
||||
},
|
||||
reverse: !ascending,
|
||||
..Default::default()
|
||||
},
|
||||
true,
|
||||
);
|
||||
|
||||
while let Some(values) = iter.next().await {
|
||||
for value in values? {
|
||||
let key = value.key().get(1..).unwrap_or_default();
|
||||
let value = value.value();
|
||||
|
||||
if !cb(&mut acc, key, value)? || first {
|
||||
return Ok(acc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(acc)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_last_change_id(
|
||||
|
@ -264,24 +293,41 @@ impl ReadTransaction<'_> {
|
|||
account_id: u32,
|
||||
collection: u8,
|
||||
) -> crate::Result<Option<u64>> {
|
||||
todo!()
|
||||
/*let key = LogKey {
|
||||
let from_key = LogKey {
|
||||
account_id,
|
||||
collection,
|
||||
change_id: 0,
|
||||
}
|
||||
.serialize();
|
||||
let to_key = LogKey {
|
||||
account_id,
|
||||
collection,
|
||||
change_id: u64::MAX,
|
||||
}
|
||||
.serialize();
|
||||
|
||||
self.conn
|
||||
.prepare_cached("SELECT k FROM l WHERE k < ? ORDER BY k DESC LIMIT 1")?
|
||||
.query_row([&key], |row| {
|
||||
let key = row.get_ref(0)?.as_bytes()?;
|
||||
let mut iter = self.trx.get_ranges(
|
||||
RangeOption {
|
||||
begin: KeySelector::first_greater_or_equal(&from_key),
|
||||
end: KeySelector::first_greater_or_equal(&to_key),
|
||||
mode: options::StreamingMode::Small,
|
||||
reverse: true,
|
||||
..Default::default()
|
||||
},
|
||||
true,
|
||||
);
|
||||
|
||||
key.deserialize_be_u64(key.len() - std::mem::size_of::<u64>())
|
||||
.map_err(|err| rusqlite::Error::ToSqlConversionFailure(err.into()))
|
||||
})
|
||||
.optional()
|
||||
.map_err(Into::into)*/
|
||||
while let Some(values) = iter.next().await {
|
||||
if let Some(value) = (values?).into_iter().next() {
|
||||
let key = value.key();
|
||||
|
||||
return key
|
||||
.deserialize_be_u64(key.len() - std::mem::size_of::<u64>())
|
||||
.map(Some);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub async fn refresh_if_old(&mut self) -> crate::Result<()> {
|
||||
|
@ -301,4 +347,78 @@ impl Store {
|
|||
trx_age: Instant::now(),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "test_mode")]
|
||||
pub async fn assert_is_empty(&self) {
|
||||
use crate::{SUBSPACE_BITMAPS, SUBSPACE_LOGS, SUBSPACE_VALUES};
|
||||
|
||||
// Purge bitmaps
|
||||
self.purge_bitmaps().await.unwrap();
|
||||
|
||||
let conn = self.read_transaction().await.unwrap();
|
||||
|
||||
let mut iter = conn.trx.get_ranges(
|
||||
RangeOption {
|
||||
begin: KeySelector::first_greater_or_equal(&[0u8][..]),
|
||||
end: KeySelector::first_greater_or_equal(&[u8::MAX][..]),
|
||||
mode: options::StreamingMode::WantAll,
|
||||
reverse: false,
|
||||
..Default::default()
|
||||
},
|
||||
true,
|
||||
);
|
||||
|
||||
while let Some(values) = iter.next().await {
|
||||
for value in values.unwrap() {
|
||||
let key = value.key();
|
||||
let value = value.value();
|
||||
let subspace = key[0];
|
||||
let key = &key[1..];
|
||||
|
||||
match subspace {
|
||||
SUBSPACE_INDEXES => {
|
||||
panic!(
|
||||
"Table index is not empty, account {}, collection {}, document {}, property {}, value {:?}: {:?}",
|
||||
u32::from_be_bytes(key[0..4].try_into().unwrap()),
|
||||
key[4],
|
||||
u32::from_be_bytes(key[key.len()-4..].try_into().unwrap()),
|
||||
key[5],
|
||||
String::from_utf8_lossy(&key[6..key.len()-4]),
|
||||
key
|
||||
);
|
||||
}
|
||||
SUBSPACE_VALUES => {
|
||||
// Ignore lastId counter
|
||||
if key.len() == 4
|
||||
&& value.len() == 8
|
||||
&& u32::deserialize(key).is_ok()
|
||||
&& u64::deserialize(value).is_ok()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
panic!("Table values is not empty: {key:?} {value:?}");
|
||||
}
|
||||
SUBSPACE_BITMAPS => {
|
||||
panic!(
|
||||
"Table bitmaps is not empty, account {}, collection {}, family {}, field {}, key {:?}: {:?}",
|
||||
u32::from_be_bytes(key[0..4].try_into().unwrap()),
|
||||
key[4],
|
||||
key[5],
|
||||
key[6],
|
||||
key,
|
||||
value
|
||||
);
|
||||
}
|
||||
SUBSPACE_LOGS => (),
|
||||
|
||||
_ => panic!("Invalid key found in database: {key:?} for subspace {subspace}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Empty database
|
||||
self.destroy().await;
|
||||
crate::backend::foundationdb::write::BITMAPS.lock().clear();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use ahash::AHashSet;
|
||||
use ahash::{AHashMap, AHashSet};
|
||||
use foundationdb::{
|
||||
options::{MutationType, StreamingMode},
|
||||
FdbError, KeySelector, RangeOption,
|
||||
|
@ -13,32 +13,44 @@ use crate::{
|
|||
key::{DeserializeBigEndian, KeySerializer},
|
||||
now, Batch, Operation,
|
||||
},
|
||||
AclKey, BitmapKey, Deserialize, IndexKey, LogKey, Serialize, Store, ValueKey, BM_DOCUMENT_IDS,
|
||||
SUBSPACE_VALUES,
|
||||
AclKey, BitmapKey, Deserialize, IndexKey, LogKey, Serialize, Store, ValueKey, SUBSPACE_VALUES,
|
||||
};
|
||||
|
||||
use super::bitmap::{next_available_index, DenseBitmap, BITS_PER_BLOCK};
|
||||
|
||||
#[cfg(feature = "test_mode")]
|
||||
const ID_ASSIGNMENT_EXPIRY: u64 = 2; // seconds
|
||||
#[cfg(not(feature = "test_mode"))]
|
||||
pub const ID_ASSIGNMENT_EXPIRY: u64 = 60 * 60; // seconds
|
||||
|
||||
const MAX_COMMIT_ATTEMPTS: u8 = 10;
|
||||
#[cfg(not(feature = "test_mode"))]
|
||||
const MAX_COMMIT_ATTEMPTS: u32 = 10;
|
||||
#[cfg(not(feature = "test_mode"))]
|
||||
const MAX_COMMIT_TIME: Duration = Duration::from_secs(10);
|
||||
|
||||
#[cfg(feature = "test_mode")]
|
||||
pub static ID_ASSIGNMENT_EXPIRY: std::sync::atomic::AtomicU64 =
|
||||
std::sync::atomic::AtomicU64::new(60 * 60); // seconds
|
||||
#[cfg(feature = "test_mode")]
|
||||
const MAX_COMMIT_ATTEMPTS: u32 = 1000;
|
||||
#[cfg(feature = "test_mode")]
|
||||
const MAX_COMMIT_TIME: Duration = Duration::from_secs(3600);
|
||||
|
||||
#[cfg(feature = "test_mode")]
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref BITMAPS: std::sync::Arc<parking_lot::Mutex<std::collections::HashMap<Vec<u8>, std::collections::HashSet<u32>>>> =
|
||||
std::sync::Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new()));
|
||||
}
|
||||
|
||||
impl Store {
|
||||
pub async fn write(&self, batch: Batch) -> crate::Result<()> {
|
||||
let start = Instant::now();
|
||||
let mut or_bitmap = DenseBitmap::empty();
|
||||
let mut and_bitmap = DenseBitmap::full();
|
||||
let mut block_num = u32::MAX;
|
||||
let mut retry_count = 0;
|
||||
let mut set_bitmaps = AHashMap::new();
|
||||
let mut clear_bitmaps = AHashMap::new();
|
||||
|
||||
loop {
|
||||
let mut account_id = u32::MAX;
|
||||
let mut collection = u8::MAX;
|
||||
let mut document_id = u32::MAX;
|
||||
|
||||
let trx = self.db.create_trx()?;
|
||||
|
||||
for op in &batch.ops {
|
||||
|
@ -56,14 +68,7 @@ impl Store {
|
|||
Operation::DocumentId {
|
||||
document_id: document_id_,
|
||||
} => {
|
||||
if block_num != u32::MAX {
|
||||
or_bitmap.reset();
|
||||
and_bitmap.reset();
|
||||
}
|
||||
document_id = *document_id_;
|
||||
or_bitmap.set(document_id);
|
||||
and_bitmap.clear(document_id);
|
||||
block_num = or_bitmap.block_num;
|
||||
}
|
||||
Operation::Value { family, field, set } => {
|
||||
let key = ValueKey {
|
||||
|
@ -101,20 +106,26 @@ impl Store {
|
|||
key,
|
||||
set,
|
||||
} => {
|
||||
let key = BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
family: *family,
|
||||
field: *field,
|
||||
block_num,
|
||||
key,
|
||||
if retry_count == 0 {
|
||||
if *set {
|
||||
&mut set_bitmaps
|
||||
} else {
|
||||
&mut clear_bitmaps
|
||||
}
|
||||
.entry(
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
family: *family,
|
||||
field: *field,
|
||||
block_num: DenseBitmap::block_num(document_id),
|
||||
key,
|
||||
}
|
||||
.serialize(),
|
||||
)
|
||||
.or_insert_with(DenseBitmap::empty)
|
||||
.set(document_id);
|
||||
}
|
||||
.serialize();
|
||||
if *set {
|
||||
trx.atomic_op(&key, &or_bitmap.bitmap, MutationType::BitOr);
|
||||
} else {
|
||||
trx.atomic_op(&key, &and_bitmap.bitmap, MutationType::BitAnd);
|
||||
};
|
||||
}
|
||||
Operation::Acl {
|
||||
grant_account_id,
|
||||
|
@ -150,12 +161,97 @@ impl Store {
|
|||
field,
|
||||
family,
|
||||
assert_value,
|
||||
} => todo!(),
|
||||
} => {
|
||||
let key = ValueKey {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
family: *family,
|
||||
field: *field,
|
||||
}
|
||||
.serialize();
|
||||
if trx
|
||||
.get(&key, false)
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.map_or(true, |bytes| !assert_value.matches(bytes.as_ref()))
|
||||
{
|
||||
trx.cancel();
|
||||
return Err(crate::Error::AssertValueFailed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (key, bitmap) in &set_bitmaps {
|
||||
trx.atomic_op(key, &bitmap.bitmap, MutationType::BitOr);
|
||||
}
|
||||
|
||||
for (key, bitmap) in &clear_bitmaps {
|
||||
trx.atomic_op(key, &bitmap.bitmap, MutationType::BitXor);
|
||||
}
|
||||
|
||||
match trx.commit().await {
|
||||
Ok(_) => {
|
||||
#[cfg(feature = "test_mode")]
|
||||
{
|
||||
for op in &batch.ops {
|
||||
match op {
|
||||
Operation::AccountId {
|
||||
account_id: account_id_,
|
||||
} => {
|
||||
account_id = *account_id_;
|
||||
}
|
||||
Operation::Collection {
|
||||
collection: collection_,
|
||||
} => {
|
||||
collection = *collection_;
|
||||
}
|
||||
Operation::DocumentId {
|
||||
document_id: document_id_,
|
||||
} => {
|
||||
document_id = *document_id_;
|
||||
}
|
||||
Operation::Bitmap {
|
||||
family,
|
||||
field,
|
||||
key,
|
||||
set,
|
||||
} => {
|
||||
let key = BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
family: *family,
|
||||
field: *field,
|
||||
block_num: DenseBitmap::block_num(document_id),
|
||||
key,
|
||||
}
|
||||
.serialize();
|
||||
if *set {
|
||||
assert!(
|
||||
BITMAPS
|
||||
.lock()
|
||||
.entry(key.clone())
|
||||
.or_default()
|
||||
.insert(document_id),
|
||||
"key {key:?} already contains document {document_id}"
|
||||
);
|
||||
} else {
|
||||
assert!(
|
||||
BITMAPS
|
||||
.lock()
|
||||
.get_mut(&key)
|
||||
.unwrap()
|
||||
.remove(&document_id),
|
||||
"key {key:?} does not contain document {document_id}"
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
Err(err) => {
|
||||
|
@ -210,7 +306,11 @@ impl Store {
|
|||
true,
|
||||
);
|
||||
|
||||
#[cfg(not(feature = "test_mode"))]
|
||||
let expired_timestamp = now() - ID_ASSIGNMENT_EXPIRY;
|
||||
#[cfg(feature = "test_mode")]
|
||||
let expired_timestamp =
|
||||
now() - ID_ASSIGNMENT_EXPIRY.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let mut reserved_ids = AHashSet::new();
|
||||
let mut expired_ids = Vec::new();
|
||||
while let Some(values) = values.next().await {
|
||||
|
@ -316,17 +416,11 @@ impl Store {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn assign_change_id(
|
||||
&self,
|
||||
account_id: u32,
|
||||
collection: impl Into<u8>,
|
||||
) -> crate::Result<u64> {
|
||||
pub async fn assign_change_id(&self, account_id: u32) -> crate::Result<u64> {
|
||||
let start = Instant::now();
|
||||
let collection = collection.into();
|
||||
let counter = KeySerializer::new(std::mem::size_of::<u32>() + 2)
|
||||
.write(SUBSPACE_VALUES)
|
||||
.write_leb128(account_id)
|
||||
.write(collection)
|
||||
.write(account_id)
|
||||
.finalize();
|
||||
|
||||
loop {
|
||||
|
|
|
@ -275,7 +275,6 @@ impl ReadTransaction<'_> {
|
|||
let mut rows = query.query([&begin, &end])?;
|
||||
|
||||
while let Some(row) = rows.next()? {
|
||||
//TODO remove subspace in Foundation
|
||||
let key = row.get_ref(0)?.as_bytes()?;
|
||||
let value = row.get_ref(1)?.as_bytes()?;
|
||||
|
||||
|
|
|
@ -253,7 +253,7 @@ impl Serialize for &AclKey {
|
|||
{
|
||||
#[cfg(feature = "key_subspace")]
|
||||
{
|
||||
KeySerializer::new(std::mem::size_of::<AclKey>() + 1).write(crate::SUBSPACE_ACLS)
|
||||
KeySerializer::new(std::mem::size_of::<AclKey>() + 1).write(crate::SUBSPACE_VALUES)
|
||||
}
|
||||
#[cfg(not(feature = "key_subspace"))]
|
||||
{
|
||||
|
|
|
@ -142,7 +142,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
|
|||
Err(jmap_client::Error::Problem(err)) if err.status() == Some(400)));
|
||||
|
||||
// Wait for sleep to be done
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
tokio::time::sleep(Duration::from_millis(1500)).await;
|
||||
|
||||
// Concurrent upload test
|
||||
for _ in 0..4 {
|
||||
|
|
|
@ -237,7 +237,7 @@ impl SmtpConnection {
|
|||
}
|
||||
self.data(3).await;
|
||||
let result = self.data_bytes(message, recipients.len(), code).await;
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
result
|
||||
}
|
||||
|
||||
|
@ -260,7 +260,7 @@ impl SmtpConnection {
|
|||
self.bdat(std::str::from_utf8(chunk).unwrap(), 2).await;
|
||||
}
|
||||
self.bdat_last("", recipients.len(), 2).await;
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
|
||||
pub async fn connect() -> Self {
|
||||
|
|
|
@ -14,6 +14,8 @@ use store::{
|
|||
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
|
||||
println!("Running Email Changes tests...");
|
||||
|
||||
server.store.destroy().await;
|
||||
client.set_default_account_id(Id::new(1));
|
||||
let mut states = vec![State::Initial];
|
||||
|
||||
for (change_id, (changes, expected_changelog)) in [
|
||||
|
@ -247,9 +249,30 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
|
|||
}
|
||||
}
|
||||
|
||||
assert_eq!(insertions.len(), 0);
|
||||
assert_eq!(updates.len(), 0);
|
||||
assert_eq!(deletions.len(), 0);
|
||||
assert_eq!(
|
||||
insertions.len(),
|
||||
0,
|
||||
"test_num: {}, state: {:?}, pending: {:?}",
|
||||
test_num,
|
||||
state,
|
||||
insertions
|
||||
);
|
||||
assert_eq!(
|
||||
updates.len(),
|
||||
0,
|
||||
"test_num: {}, state: {:?}, pending: {:?}",
|
||||
test_num,
|
||||
state,
|
||||
updates
|
||||
);
|
||||
assert_eq!(
|
||||
deletions.len(),
|
||||
0,
|
||||
"test_num: {}, state: {:?}, pending: {:?}",
|
||||
test_num,
|
||||
state,
|
||||
deletions
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use crate::jmap::{
|
|||
|
||||
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
|
||||
println!("Running Email QueryChanges tests...");
|
||||
|
||||
server.store.destroy().await;
|
||||
let mailbox1_id = client
|
||||
.set_default_account_id(Id::new(1).to_string())
|
||||
.mailbox_create("JMAP Changes 1", None::<String>, Role::None)
|
||||
|
@ -117,31 +117,18 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
|
|||
LogAction::Delete(id) => {
|
||||
let id = *id_map.get(id).unwrap();
|
||||
client.email_destroy(&id.to_string()).await.unwrap();
|
||||
|
||||
// Delete virtual threadId created during tests (so assert_empty_store succeeds)
|
||||
server
|
||||
.store
|
||||
.write(
|
||||
BatchBuilder::new()
|
||||
.with_account_id(1)
|
||||
.with_collection(Collection::Email)
|
||||
.update_document(id.document_id())
|
||||
.bitmap(Property::ThreadId, id.prefix_id(), F_CLEAR)
|
||||
.build_batch(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
removed_ids.insert(id);
|
||||
}
|
||||
LogAction::Move(from, to) => {
|
||||
let id = *id_map.get(from).unwrap();
|
||||
let new_id = Id::from_parts(thread_id, id.document_id());
|
||||
|
||||
server
|
||||
.store
|
||||
.write(
|
||||
BatchBuilder::new()
|
||||
.with_account_id(1)
|
||||
.with_collection(Collection::Thread)
|
||||
.create_document(thread_id)
|
||||
.with_collection(Collection::Email)
|
||||
.update_document(id.document_id())
|
||||
.value(Property::ThreadId, id.prefix_id(), F_BITMAP | F_CLEAR)
|
||||
|
@ -265,6 +252,19 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
|
|||
|
||||
destroy_all_mailboxes(client).await;
|
||||
|
||||
// Delete virtual threads
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch.with_account_id(1).with_collection(Collection::Thread);
|
||||
for thread_id in server
|
||||
.get_document_ids(1, Collection::Thread)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap_or_default()
|
||||
{
|
||||
batch.delete_document(thread_id);
|
||||
}
|
||||
server.store.write(batch.build_batch()).await.unwrap();
|
||||
|
||||
server.store.assert_is_empty().await;
|
||||
}
|
||||
|
||||
|
|
|
@ -186,7 +186,7 @@ pub async fn jmap_tests() {
|
|||
email_copy::test(params.server.clone(), &mut params.client).await;
|
||||
thread_get::test(params.server.clone(), &mut params.client).await;
|
||||
thread_merge::test(params.server.clone(), &mut params.client).await;
|
||||
mailbox::test(params.server.clone(), &mut params.client).await;*/
|
||||
mailbox::test(params.server.clone(), &mut params.client).await;
|
||||
delivery::test(params.server.clone(), &mut params.client).await;
|
||||
auth_acl::test(params.server.clone(), &mut params.client).await;
|
||||
auth_limits::test(params.server.clone(), &mut params.client).await;
|
||||
|
@ -196,7 +196,7 @@ pub async fn jmap_tests() {
|
|||
sieve_script::test(params.server.clone(), &mut params.client).await;
|
||||
vacation_response::test(params.server.clone(), &mut params.client).await;
|
||||
email_submission::test(params.server.clone(), &mut params.client).await;
|
||||
websocket::test(params.server.clone(), &mut params.client).await;
|
||||
websocket::test(params.server.clone(), &mut params.client).await;*/
|
||||
stress_test::test(params.server.clone(), params.client).await;
|
||||
|
||||
if delete {
|
||||
|
@ -257,6 +257,10 @@ async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
|
|||
);
|
||||
}
|
||||
|
||||
if delete_if_exists {
|
||||
jmap.store.destroy().await;
|
||||
}
|
||||
|
||||
// Create client
|
||||
let mut client = Client::new()
|
||||
.credentials(Credentials::basic("admin", "secret"))
|
||||
|
|
|
@ -175,7 +175,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
|
|||
//expect_nothing(&mut event_rx).await;
|
||||
|
||||
// Multiple change updates should be grouped and pushed in intervals
|
||||
for num in 0..25 {
|
||||
for num in 0..5 {
|
||||
client
|
||||
.mailbox_update_sort_order(&mailbox_id, num)
|
||||
.await
|
||||
|
|
|
@ -7,10 +7,16 @@ use store::{write::BatchBuilder, Store};
|
|||
pub async fn test(db: Arc<Store>) {
|
||||
println!("Running Store ID assignment tests...");
|
||||
|
||||
store::backend::foundationdb::write::ID_ASSIGNMENT_EXPIRY
|
||||
.store(2, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
test_1(db.clone()).await;
|
||||
test_2(db.clone()).await;
|
||||
test_3(db.clone()).await;
|
||||
test_4(db).await;
|
||||
|
||||
store::backend::foundationdb::write::ID_ASSIGNMENT_EXPIRY
|
||||
.store(60 * 60, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
async fn test_1(db: Arc<Store>) {
|
||||
|
|
|
@ -30,8 +30,8 @@ pub async fn store_tests() {
|
|||
if insert {
|
||||
db.destroy().await;
|
||||
}
|
||||
assign_id::test(db).await;
|
||||
//query::test(db, insert).await;
|
||||
//assign_id::test(db).await;
|
||||
query::test(db, insert).await;
|
||||
temp_dir.delete();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue