From 40376cf2b4f297b114764375c16599569192e1dd Mon Sep 17 00:00:00 2001 From: Mauro D Date: Fri, 26 May 2023 14:57:04 +0000 Subject: [PATCH] FoundationDB passing tests. --- README.md | 31 +++- crates/jmap/src/email/copy.rs | 9 +- crates/jmap/src/email/import.rs | 9 +- crates/jmap/src/email/ingest.rs | 9 +- crates/jmap/src/mailbox/set.rs | 2 +- crates/store/Cargo.toml | 6 +- .../store/src/backend/foundationdb/bitmap.rs | 36 +--- crates/store/src/backend/foundationdb/mod.rs | 1 + .../store/src/backend/foundationdb/purge.rs | 98 ++++++++++ crates/store/src/backend/foundationdb/read.rs | 154 ++++++++++++++-- .../store/src/backend/foundationdb/write.rs | 172 ++++++++++++++---- crates/store/src/backend/sqlite/read.rs | 1 - crates/store/src/write/key.rs | 2 +- tests/src/jmap/auth_limits.rs | 2 +- tests/src/jmap/delivery.rs | 4 +- tests/src/jmap/email_changes.rs | 29 ++- tests/src/jmap/email_query_changes.rs | 32 ++-- tests/src/jmap/mod.rs | 8 +- tests/src/jmap/push_subscription.rs | 2 +- tests/src/store/assign_id.rs | 6 + tests/src/store/mod.rs | 4 +- 21 files changed, 491 insertions(+), 126 deletions(-) create mode 100644 crates/store/src/backend/foundationdb/purge.rs diff --git a/README.md b/README.md index a1ee576d..7f194c6f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,29 @@ -# store -Distributed Message Store +# Stalwart Mail Server + +[![Test](https://github.com/stalwartlabs/mail-server/actions/workflows/test.yml/badge.svg)](https://github.com/stalwartlabs/mail-server/actions/workflows/test.yml) +[![Build](https://github.com/stalwartlabs/mail-server/actions/workflows/build.yml/badge.svg)](https://github.com/stalwartlabs/mail-server/actions/workflows/build.yml) +[![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0) +[![](https://img.shields.io/discord/923615863037390889?label=Chat)](https://discord.gg/jtgtCNj66U) +[![](https://img.shields.io/twitter/follow/stalwartlabs?style=flat)](https://twitter.com/stalwartlabs) + +**Stalwart Mail Server** is an open-source mail server solution with JMAP, IMAP4, and SMTP support and a wide range of modern features. It is written in Rust and designed to be secure, fast, robust and scalable. + +_Caveat emptor_: This repository is currently under active development and is not yet ready for production use. Please refer to the JMAP, IMAP and SMTP repositories if you would like to +try each of these servers individually: + +* [Stalwart JMAP Server](https://github.com/stalwartlabs/jmap-server/) +* [Stalwart IMAP Server](https://github.com/stalwartlabs/imap-server/) +* [Stalwart SMTP Server](https://github.com/stalwartlabs/smtp-server/) + +## License + +Licensed under the terms of the [GNU Affero General Public License](https://www.gnu.org/licenses/agpl-3.0.en.html) as published by +the Free Software Foundation, either version 3 of the License, or (at your option) any later version. +See [LICENSE](LICENSE) for more details. + +You can be released from the requirements of the AGPLv3 license by purchasing +a commercial license. Please contact licensing@stalw.art for more details. + +## Copyright + +Copyright (C) 2020, Stalwart Labs Ltd. diff --git a/crates/jmap/src/email/copy.rs b/crates/jmap/src/email/copy.rs index ec95bc9b..13f182ca 100644 --- a/crates/jmap/src/email/copy.rs +++ b/crates/jmap/src/email/copy.rs @@ -296,6 +296,10 @@ impl JMAP { MethodError::ServerPartialFail })?; + // Prepare batch + let mut batch = BatchBuilder::new(); + batch.with_account_id(account_id); + // Build change log let mut changes = self.begin_changes(account_id).await?; let thread_id = if let Some(thread_id) = thread_id { @@ -305,6 +309,9 @@ impl JMAP { let thread_id = self .assign_document_id(account_id, Collection::Thread) .await?; + batch + .with_collection(Collection::Thread) + .create_document(thread_id); changes.log_insert(Collection::Thread, thread_id); thread_id }; @@ -316,9 +323,7 @@ impl JMAP { } // Build batch - let mut batch = BatchBuilder::new(); batch - .with_account_id(account_id) .with_collection(Collection::Email) .create_document(message_id) .value(Property::ThreadId, thread_id, F_VALUE | F_BITMAP) diff --git a/crates/jmap/src/email/import.rs b/crates/jmap/src/email/import.rs index 62fd8175..a6e56f3d 100644 --- a/crates/jmap/src/email/import.rs +++ b/crates/jmap/src/email/import.rs @@ -7,6 +7,7 @@ use jmap_proto::{ types::{ acl::Acl, collection::Collection, + id::Id, property::Property, state::{State, StateChange}, type_state::TypeState, @@ -69,14 +70,18 @@ impl JMAP { id, SetError::invalid_properties() .with_property(Property::MailboxIds) - .with_description(format!("Mailbox {} does not exist.", mailbox_id)), + .with_description(format!( + "Mailbox {} does not exist.", + Id::from(*mailbox_id) + )), ); continue 'outer; } else if matches!(&can_add_mailbox_ids, Some(ids) if !ids.contains(*mailbox_id)) { response.not_created.append( id, SetError::forbidden().with_description(format!( - "You are not allowed to add messages to mailbox {mailbox_id}." + "You are not allowed to add messages to mailbox {}.", + Id::from(*mailbox_id) )), ); continue 'outer; diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs index f131dfc5..73dc18dd 100644 --- a/crates/jmap/src/email/ingest.rs +++ b/crates/jmap/src/email/ingest.rs @@ -174,6 +174,10 @@ impl JMAP { IngestError::Temporary })?; + // Prepare batch + let mut batch = BatchBuilder::new(); + batch.with_account_id(account_id); + // Build change log let mut changes = ChangeLogBuilder::with_change_id(change_id); let thread_id = if let Some(thread_id) = thread_id { @@ -192,6 +196,9 @@ impl JMAP { "Failed to assign documentId for new thread."); IngestError::Temporary })?; + batch + .with_collection(Collection::Thread) + .create_document(thread_id); changes.log_insert(Collection::Thread, thread_id); thread_id }; @@ -202,9 +209,7 @@ impl JMAP { } // Build write batch - let mut batch = BatchBuilder::new(); batch - .with_account_id(account_id) .with_collection(Collection::Email) .create_document(document_id) .index_message( diff --git a/crates/jmap/src/mailbox/set.rs b/crates/jmap/src/mailbox/set.rs index 1e0eed21..8466a0d4 100644 --- a/crates/jmap/src/mailbox/set.rs +++ b/crates/jmap/src/mailbox/set.rs @@ -160,7 +160,7 @@ impl JMAP { batch .with_account_id(account_id) .with_collection(Collection::Mailbox) - .create_document(document_id) + .update_document(document_id) .custom(builder); if !batch.is_empty() { changes.log_update(Collection::Mailbox, document_id); diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index fc50fce5..f6d1e53d 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -27,7 +27,7 @@ jieba-rs = "0.6" # Chinese stemmer xxhash-rust = { version = "0.8.5", features = ["xxh3"] } farmhash = "1.1.5" siphasher = "0.3" -parking_lot = { version = "0.12.1", optional = true } +parking_lot = "0.12.1" lru-cache = { version = "0.1.2", optional = true } num_cpus = { version = "1.15.0", optional = true } blake3 = "1.3.3" @@ -35,11 +35,11 @@ tracing = "0.1" [features] -default = ["sqlite"] +default = ["foundation"] rocks = ["rocksdb", "rayon", "is_sync"] sqlite = ["rusqlite", "rayon", "r2d2", "num_cpus", "is_sync"] foundation = ["foundationdb", "futures", "is_async", "key_subspace"] -is_sync = ["maybe-async/is_sync", "parking_lot", "lru-cache"] +is_sync = ["maybe-async/is_sync", "lru-cache"] is_async = [] key_subspace = [] test_mode = [] diff --git a/crates/store/src/backend/foundationdb/bitmap.rs b/crates/store/src/backend/foundationdb/bitmap.rs index defadbe5..ea8fb229 100644 --- a/crates/store/src/backend/foundationdb/bitmap.rs +++ b/crates/store/src/backend/foundationdb/bitmap.rs @@ -8,56 +8,34 @@ pub const BITS_PER_BLOCK: u32 = WORD_SIZE_BITS * WORDS_PER_BLOCK; const BITS_MASK: u32 = BITS_PER_BLOCK - 1; pub struct DenseBitmap { - restore_value: u8, - restore_pos: usize, - pub block_num: u32, pub bitmap: [u8; WORD_SIZE * WORDS_PER_BLOCK as usize], } impl DenseBitmap { pub fn empty() -> Self { Self { - block_num: 0, - restore_pos: 0, - restore_value: 0, bitmap: [0; WORD_SIZE * WORDS_PER_BLOCK as usize], } } pub fn full() -> Self { Self { - block_num: 0, - restore_pos: 0, - restore_value: u8::MAX, bitmap: [u8::MAX; WORD_SIZE * WORDS_PER_BLOCK as usize], } } pub fn set(&mut self, index: u32) { - self.block_num = index / BITS_PER_BLOCK; let index = index & BITS_MASK; - self.restore_pos = (index / 8) as usize; - self.bitmap[self.restore_pos] = 1 << (index & 7); - } - - #[cfg(test)] - pub fn set_or(&mut self, index: u32) { - let _index = index; - self.block_num = index / BITS_PER_BLOCK; - let index = index & BITS_MASK; - self.restore_pos = (index / 8) as usize; - self.bitmap[self.restore_pos] |= 1 << (index & 7); + self.bitmap[(index / 8) as usize] |= 1 << (index & 7); } pub fn clear(&mut self, index: u32) { - self.block_num = index / BITS_PER_BLOCK; - let index = BITS_MASK - (index & BITS_MASK); - self.restore_pos = (index / 8) as usize; - self.bitmap[self.restore_pos] = !(1 << (index & 7)); + let index = index & BITS_MASK; + self.bitmap[(index / 8) as usize] &= !(1 << (index & 7)); } - pub fn reset(&mut self) { - self.bitmap[self.restore_pos] = self.restore_value; + pub fn block_num(index: u32) -> u32 { + index / BITS_PER_BLOCK } } @@ -156,7 +134,7 @@ mod tests { blocks .entry(item / BITS_PER_BLOCK) .or_insert_with(DenseBitmap::empty) - .set_or(item); + .set(item); } let mut bitmap_blocks = RoaringBitmap::new(); for (block_num, dense_bitmap) in blocks { @@ -184,7 +162,7 @@ mod tests { if id < 1023 { Some(id + 1) } else { None }, "reserved id failed for {id}" ); - bm.set_or(id); + bm.set(id); } } } diff --git a/crates/store/src/backend/foundationdb/mod.rs b/crates/store/src/backend/foundationdb/mod.rs index 1e4bee43..596d4a65 100644 --- a/crates/store/src/backend/foundationdb/mod.rs +++ b/crates/store/src/backend/foundationdb/mod.rs @@ -4,6 +4,7 @@ use crate::Error; pub mod bitmap; pub mod main; +pub mod purge; pub mod read; pub mod write; diff --git a/crates/store/src/backend/foundationdb/purge.rs b/crates/store/src/backend/foundationdb/purge.rs new file mode 100644 index 00000000..541a26fe --- /dev/null +++ b/crates/store/src/backend/foundationdb/purge.rs @@ -0,0 +1,98 @@ +use foundationdb::{ + options::{self, MutationType}, + FdbError, KeySelector, RangeOption, +}; +use futures::StreamExt; + +use crate::{ + write::key::KeySerializer, Store, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS, + SUBSPACE_VALUES, +}; + +use super::bitmap::DenseBitmap; + +const MAX_COMMIT_ATTEMPTS: u8 = 25; + +impl Store { + pub async fn purge_bitmaps(&self) -> crate::Result<()> { + // Obtain all empty bitmaps + let trx = self.db.create_trx()?; + let mut iter = trx.get_ranges( + RangeOption { + begin: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, 0u8][..]), + end: KeySelector::first_greater_or_equal(&[SUBSPACE_BITMAPS, u8::MAX][..]), + mode: options::StreamingMode::WantAll, + reverse: false, + ..Default::default() + }, + true, + ); + let mut delete_keys = Vec::new(); + + while let Some(values) = iter.next().await { + for value in values? { + if value.value().iter().all(|byte| *byte == 0) { + delete_keys.push(value.key().to_vec()); + } + } + } + if delete_keys.is_empty() { + return Ok(()); + } + + // Delete keys + let bitmap = DenseBitmap::empty(); + for chunk in delete_keys.chunks(1024) { + let mut retry_count = 0; + loop { + let trx = self.db.create_trx()?; + for key in chunk { + trx.atomic_op(key, &bitmap.bitmap, MutationType::CompareAndClear); + } + match trx.commit().await { + Ok(_) => { + break; + } + Err(err) => { + if retry_count < MAX_COMMIT_ATTEMPTS { + err.on_error().await?; + retry_count += 1; + } else { + return Err(FdbError::from(err).into()); + } + } + } + } + } + + Ok(()) + } + + pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> { + for subspace in [ + SUBSPACE_BITMAPS, + SUBSPACE_VALUES, + SUBSPACE_LOGS, + SUBSPACE_INDEXES, + ] { + let from_key = KeySerializer::new(std::mem::size_of::() + 2) + .write(subspace) + .write(account_id) + .write(0u8) + .finalize(); + let to_key = KeySerializer::new(std::mem::size_of::() + 2) + .write(subspace) + .write(account_id) + .write(u8::MAX) + .finalize(); + + let trx = self.db.create_trx()?; + trx.clear_range(&from_key, &to_key); + if let Err(err) = trx.commit().await { + return Err(FdbError::from(err).into()); + } + } + + Ok(()) + } +} diff --git a/crates/store/src/backend/foundationdb/read.rs b/crates/store/src/backend/foundationdb/read.rs index 0aa73204..335596f2 100644 --- a/crates/store/src/backend/foundationdb/read.rs +++ b/crates/store/src/backend/foundationdb/read.rs @@ -13,15 +13,15 @@ use roaring::RoaringBitmap; use crate::{ query::Operator, write::key::{DeserializeBigEndian, KeySerializer}, - BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, ReadTransaction, Serialize, Store, - ValueKey, SUBSPACE_INDEXES, + BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, ReadTransaction, Serialize, + Store, SUBSPACE_INDEXES, }; use super::bitmap::DeserializeBlock; impl ReadTransaction<'_> { #[inline(always)] - pub async fn get_value(&self, key: ValueKey) -> crate::Result> + pub async fn get_value(&self, key: impl Key) -> crate::Result> where U: Deserialize, { @@ -39,7 +39,7 @@ impl ReadTransaction<'_> { mut key: BitmapKey, bm: &mut RoaringBitmap, ) -> crate::Result<()> { - let begin = key.serialize(); + let begin = (&key).serialize(); key.block_num = u32::MAX; let end = key.serialize(); let key_len = begin.len(); @@ -162,6 +162,7 @@ impl ReadTransaction<'_> { ), ), }; + let key_len = begin.key().len(); let opt = RangeOption { begin, @@ -182,7 +183,6 @@ impl ReadTransaction<'_> { } } } else { - let key_len = begin.len(); while let Some(values) = range_stream.next().await { for value in values? { let key = value.key(); @@ -256,7 +256,36 @@ impl ReadTransaction<'_> { ascending: bool, cb: impl Fn(&mut T, &[u8], &[u8]) -> crate::Result + Sync + Send + 'static, ) -> crate::Result { - todo!() + let begin = begin.serialize(); + let end = end.serialize(); + + let mut iter = self.trx.get_ranges( + RangeOption { + begin: KeySelector::first_greater_or_equal(&begin), + end: KeySelector::first_greater_than(&end), + mode: if first { + options::StreamingMode::Small + } else { + options::StreamingMode::Iterator + }, + reverse: !ascending, + ..Default::default() + }, + true, + ); + + while let Some(values) = iter.next().await { + for value in values? { + let key = value.key().get(1..).unwrap_or_default(); + let value = value.value(); + + if !cb(&mut acc, key, value)? || first { + return Ok(acc); + } + } + } + + Ok(acc) } pub(crate) async fn get_last_change_id( @@ -264,24 +293,41 @@ impl ReadTransaction<'_> { account_id: u32, collection: u8, ) -> crate::Result> { - todo!() - /*let key = LogKey { + let from_key = LogKey { + account_id, + collection, + change_id: 0, + } + .serialize(); + let to_key = LogKey { account_id, collection, change_id: u64::MAX, } .serialize(); - self.conn - .prepare_cached("SELECT k FROM l WHERE k < ? ORDER BY k DESC LIMIT 1")? - .query_row([&key], |row| { - let key = row.get_ref(0)?.as_bytes()?; + let mut iter = self.trx.get_ranges( + RangeOption { + begin: KeySelector::first_greater_or_equal(&from_key), + end: KeySelector::first_greater_or_equal(&to_key), + mode: options::StreamingMode::Small, + reverse: true, + ..Default::default() + }, + true, + ); - key.deserialize_be_u64(key.len() - std::mem::size_of::()) - .map_err(|err| rusqlite::Error::ToSqlConversionFailure(err.into())) - }) - .optional() - .map_err(Into::into)*/ + while let Some(values) = iter.next().await { + if let Some(value) = (values?).into_iter().next() { + let key = value.key(); + + return key + .deserialize_be_u64(key.len() - std::mem::size_of::()) + .map(Some); + } + } + + Ok(None) } pub async fn refresh_if_old(&mut self) -> crate::Result<()> { @@ -301,4 +347,78 @@ impl Store { trx_age: Instant::now(), }) } + + #[cfg(feature = "test_mode")] + pub async fn assert_is_empty(&self) { + use crate::{SUBSPACE_BITMAPS, SUBSPACE_LOGS, SUBSPACE_VALUES}; + + // Purge bitmaps + self.purge_bitmaps().await.unwrap(); + + let conn = self.read_transaction().await.unwrap(); + + let mut iter = conn.trx.get_ranges( + RangeOption { + begin: KeySelector::first_greater_or_equal(&[0u8][..]), + end: KeySelector::first_greater_or_equal(&[u8::MAX][..]), + mode: options::StreamingMode::WantAll, + reverse: false, + ..Default::default() + }, + true, + ); + + while let Some(values) = iter.next().await { + for value in values.unwrap() { + let key = value.key(); + let value = value.value(); + let subspace = key[0]; + let key = &key[1..]; + + match subspace { + SUBSPACE_INDEXES => { + panic!( + "Table index is not empty, account {}, collection {}, document {}, property {}, value {:?}: {:?}", + u32::from_be_bytes(key[0..4].try_into().unwrap()), + key[4], + u32::from_be_bytes(key[key.len()-4..].try_into().unwrap()), + key[5], + String::from_utf8_lossy(&key[6..key.len()-4]), + key + ); + } + SUBSPACE_VALUES => { + // Ignore lastId counter + if key.len() == 4 + && value.len() == 8 + && u32::deserialize(key).is_ok() + && u64::deserialize(value).is_ok() + { + continue; + } + + panic!("Table values is not empty: {key:?} {value:?}"); + } + SUBSPACE_BITMAPS => { + panic!( + "Table bitmaps is not empty, account {}, collection {}, family {}, field {}, key {:?}: {:?}", + u32::from_be_bytes(key[0..4].try_into().unwrap()), + key[4], + key[5], + key[6], + key, + value + ); + } + SUBSPACE_LOGS => (), + + _ => panic!("Invalid key found in database: {key:?} for subspace {subspace}"), + } + } + } + + // Empty database + self.destroy().await; + crate::backend::foundationdb::write::BITMAPS.lock().clear(); + } } diff --git a/crates/store/src/backend/foundationdb/write.rs b/crates/store/src/backend/foundationdb/write.rs index ec2ae66f..afc1dee1 100644 --- a/crates/store/src/backend/foundationdb/write.rs +++ b/crates/store/src/backend/foundationdb/write.rs @@ -1,6 +1,6 @@ use std::time::{Duration, Instant}; -use ahash::AHashSet; +use ahash::{AHashMap, AHashSet}; use foundationdb::{ options::{MutationType, StreamingMode}, FdbError, KeySelector, RangeOption, @@ -13,32 +13,44 @@ use crate::{ key::{DeserializeBigEndian, KeySerializer}, now, Batch, Operation, }, - AclKey, BitmapKey, Deserialize, IndexKey, LogKey, Serialize, Store, ValueKey, BM_DOCUMENT_IDS, - SUBSPACE_VALUES, + AclKey, BitmapKey, Deserialize, IndexKey, LogKey, Serialize, Store, ValueKey, SUBSPACE_VALUES, }; use super::bitmap::{next_available_index, DenseBitmap, BITS_PER_BLOCK}; -#[cfg(feature = "test_mode")] -const ID_ASSIGNMENT_EXPIRY: u64 = 2; // seconds #[cfg(not(feature = "test_mode"))] pub const ID_ASSIGNMENT_EXPIRY: u64 = 60 * 60; // seconds - -const MAX_COMMIT_ATTEMPTS: u8 = 10; +#[cfg(not(feature = "test_mode"))] +const MAX_COMMIT_ATTEMPTS: u32 = 10; +#[cfg(not(feature = "test_mode"))] const MAX_COMMIT_TIME: Duration = Duration::from_secs(10); +#[cfg(feature = "test_mode")] +pub static ID_ASSIGNMENT_EXPIRY: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(60 * 60); // seconds +#[cfg(feature = "test_mode")] +const MAX_COMMIT_ATTEMPTS: u32 = 1000; +#[cfg(feature = "test_mode")] +const MAX_COMMIT_TIME: Duration = Duration::from_secs(3600); + +#[cfg(feature = "test_mode")] +lazy_static::lazy_static! { +pub static ref BITMAPS: std::sync::Arc, std::collections::HashSet>>> = + std::sync::Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new())); +} + impl Store { pub async fn write(&self, batch: Batch) -> crate::Result<()> { let start = Instant::now(); - let mut or_bitmap = DenseBitmap::empty(); - let mut and_bitmap = DenseBitmap::full(); - let mut block_num = u32::MAX; let mut retry_count = 0; + let mut set_bitmaps = AHashMap::new(); + let mut clear_bitmaps = AHashMap::new(); loop { let mut account_id = u32::MAX; let mut collection = u8::MAX; let mut document_id = u32::MAX; + let trx = self.db.create_trx()?; for op in &batch.ops { @@ -56,14 +68,7 @@ impl Store { Operation::DocumentId { document_id: document_id_, } => { - if block_num != u32::MAX { - or_bitmap.reset(); - and_bitmap.reset(); - } document_id = *document_id_; - or_bitmap.set(document_id); - and_bitmap.clear(document_id); - block_num = or_bitmap.block_num; } Operation::Value { family, field, set } => { let key = ValueKey { @@ -101,20 +106,26 @@ impl Store { key, set, } => { - let key = BitmapKey { - account_id, - collection, - family: *family, - field: *field, - block_num, - key, + if retry_count == 0 { + if *set { + &mut set_bitmaps + } else { + &mut clear_bitmaps + } + .entry( + BitmapKey { + account_id, + collection, + family: *family, + field: *field, + block_num: DenseBitmap::block_num(document_id), + key, + } + .serialize(), + ) + .or_insert_with(DenseBitmap::empty) + .set(document_id); } - .serialize(); - if *set { - trx.atomic_op(&key, &or_bitmap.bitmap, MutationType::BitOr); - } else { - trx.atomic_op(&key, &and_bitmap.bitmap, MutationType::BitAnd); - }; } Operation::Acl { grant_account_id, @@ -150,12 +161,97 @@ impl Store { field, family, assert_value, - } => todo!(), + } => { + let key = ValueKey { + account_id, + collection, + document_id, + family: *family, + field: *field, + } + .serialize(); + if trx + .get(&key, false) + .await + .unwrap_or_default() + .map_or(true, |bytes| !assert_value.matches(bytes.as_ref())) + { + trx.cancel(); + return Err(crate::Error::AssertValueFailed); + } + } } } + for (key, bitmap) in &set_bitmaps { + trx.atomic_op(key, &bitmap.bitmap, MutationType::BitOr); + } + + for (key, bitmap) in &clear_bitmaps { + trx.atomic_op(key, &bitmap.bitmap, MutationType::BitXor); + } + match trx.commit().await { Ok(_) => { + #[cfg(feature = "test_mode")] + { + for op in &batch.ops { + match op { + Operation::AccountId { + account_id: account_id_, + } => { + account_id = *account_id_; + } + Operation::Collection { + collection: collection_, + } => { + collection = *collection_; + } + Operation::DocumentId { + document_id: document_id_, + } => { + document_id = *document_id_; + } + Operation::Bitmap { + family, + field, + key, + set, + } => { + let key = BitmapKey { + account_id, + collection, + family: *family, + field: *field, + block_num: DenseBitmap::block_num(document_id), + key, + } + .serialize(); + if *set { + assert!( + BITMAPS + .lock() + .entry(key.clone()) + .or_default() + .insert(document_id), + "key {key:?} already contains document {document_id}" + ); + } else { + assert!( + BITMAPS + .lock() + .get_mut(&key) + .unwrap() + .remove(&document_id), + "key {key:?} does not contain document {document_id}" + ); + } + } + _ => {} + } + } + } + return Ok(()); } Err(err) => { @@ -210,7 +306,11 @@ impl Store { true, ); + #[cfg(not(feature = "test_mode"))] let expired_timestamp = now() - ID_ASSIGNMENT_EXPIRY; + #[cfg(feature = "test_mode")] + let expired_timestamp = + now() - ID_ASSIGNMENT_EXPIRY.load(std::sync::atomic::Ordering::Relaxed); let mut reserved_ids = AHashSet::new(); let mut expired_ids = Vec::new(); while let Some(values) = values.next().await { @@ -316,17 +416,11 @@ impl Store { } } - pub async fn assign_change_id( - &self, - account_id: u32, - collection: impl Into, - ) -> crate::Result { + pub async fn assign_change_id(&self, account_id: u32) -> crate::Result { let start = Instant::now(); - let collection = collection.into(); let counter = KeySerializer::new(std::mem::size_of::() + 2) .write(SUBSPACE_VALUES) - .write_leb128(account_id) - .write(collection) + .write(account_id) .finalize(); loop { diff --git a/crates/store/src/backend/sqlite/read.rs b/crates/store/src/backend/sqlite/read.rs index 1209a404..fa915efc 100644 --- a/crates/store/src/backend/sqlite/read.rs +++ b/crates/store/src/backend/sqlite/read.rs @@ -275,7 +275,6 @@ impl ReadTransaction<'_> { let mut rows = query.query([&begin, &end])?; while let Some(row) = rows.next()? { - //TODO remove subspace in Foundation let key = row.get_ref(0)?.as_bytes()?; let value = row.get_ref(1)?.as_bytes()?; diff --git a/crates/store/src/write/key.rs b/crates/store/src/write/key.rs index 3139505d..05d36f2d 100644 --- a/crates/store/src/write/key.rs +++ b/crates/store/src/write/key.rs @@ -253,7 +253,7 @@ impl Serialize for &AclKey { { #[cfg(feature = "key_subspace")] { - KeySerializer::new(std::mem::size_of::() + 1).write(crate::SUBSPACE_ACLS) + KeySerializer::new(std::mem::size_of::() + 1).write(crate::SUBSPACE_VALUES) } #[cfg(not(feature = "key_subspace"))] { diff --git a/tests/src/jmap/auth_limits.rs b/tests/src/jmap/auth_limits.rs index c44ff1fe..e443c653 100644 --- a/tests/src/jmap/auth_limits.rs +++ b/tests/src/jmap/auth_limits.rs @@ -142,7 +142,7 @@ pub async fn test(server: Arc, admin_client: &mut Client) { Err(jmap_client::Error::Problem(err)) if err.status() == Some(400))); // Wait for sleep to be done - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(1500)).await; // Concurrent upload test for _ in 0..4 { diff --git a/tests/src/jmap/delivery.rs b/tests/src/jmap/delivery.rs index 5dfef4a5..2fc3e33f 100644 --- a/tests/src/jmap/delivery.rs +++ b/tests/src/jmap/delivery.rs @@ -237,7 +237,7 @@ impl SmtpConnection { } self.data(3).await; let result = self.data_bytes(message, recipients.len(), code).await; - tokio::time::sleep(Duration::from_millis(200)).await; + tokio::time::sleep(Duration::from_millis(500)).await; result } @@ -260,7 +260,7 @@ impl SmtpConnection { self.bdat(std::str::from_utf8(chunk).unwrap(), 2).await; } self.bdat_last("", recipients.len(), 2).await; - tokio::time::sleep(Duration::from_millis(200)).await; + tokio::time::sleep(Duration::from_millis(500)).await; } pub async fn connect() -> Self { diff --git a/tests/src/jmap/email_changes.rs b/tests/src/jmap/email_changes.rs index 454c80aa..5be401a5 100644 --- a/tests/src/jmap/email_changes.rs +++ b/tests/src/jmap/email_changes.rs @@ -14,6 +14,8 @@ use store::{ pub async fn test(server: Arc, client: &mut Client) { println!("Running Email Changes tests..."); + server.store.destroy().await; + client.set_default_account_id(Id::new(1)); let mut states = vec![State::Initial]; for (change_id, (changes, expected_changelog)) in [ @@ -247,9 +249,30 @@ pub async fn test(server: Arc, client: &mut Client) { } } - assert_eq!(insertions.len(), 0); - assert_eq!(updates.len(), 0); - assert_eq!(deletions.len(), 0); + assert_eq!( + insertions.len(), + 0, + "test_num: {}, state: {:?}, pending: {:?}", + test_num, + state, + insertions + ); + assert_eq!( + updates.len(), + 0, + "test_num: {}, state: {:?}, pending: {:?}", + test_num, + state, + updates + ); + assert_eq!( + deletions.len(), + 0, + "test_num: {}, state: {:?}, pending: {:?}", + test_num, + state, + deletions + ); } } diff --git a/tests/src/jmap/email_query_changes.rs b/tests/src/jmap/email_query_changes.rs index 51904761..94a019d9 100644 --- a/tests/src/jmap/email_query_changes.rs +++ b/tests/src/jmap/email_query_changes.rs @@ -20,7 +20,7 @@ use crate::jmap::{ pub async fn test(server: Arc, client: &mut Client) { println!("Running Email QueryChanges tests..."); - + server.store.destroy().await; let mailbox1_id = client .set_default_account_id(Id::new(1).to_string()) .mailbox_create("JMAP Changes 1", None::, Role::None) @@ -117,31 +117,18 @@ pub async fn test(server: Arc, client: &mut Client) { LogAction::Delete(id) => { let id = *id_map.get(id).unwrap(); client.email_destroy(&id.to_string()).await.unwrap(); - - // Delete virtual threadId created during tests (so assert_empty_store succeeds) - server - .store - .write( - BatchBuilder::new() - .with_account_id(1) - .with_collection(Collection::Email) - .update_document(id.document_id()) - .bitmap(Property::ThreadId, id.prefix_id(), F_CLEAR) - .build_batch(), - ) - .await - .unwrap(); removed_ids.insert(id); } LogAction::Move(from, to) => { let id = *id_map.get(from).unwrap(); let new_id = Id::from_parts(thread_id, id.document_id()); - server .store .write( BatchBuilder::new() .with_account_id(1) + .with_collection(Collection::Thread) + .create_document(thread_id) .with_collection(Collection::Email) .update_document(id.document_id()) .value(Property::ThreadId, id.prefix_id(), F_BITMAP | F_CLEAR) @@ -265,6 +252,19 @@ pub async fn test(server: Arc, client: &mut Client) { destroy_all_mailboxes(client).await; + // Delete virtual threads + let mut batch = BatchBuilder::new(); + batch.with_account_id(1).with_collection(Collection::Thread); + for thread_id in server + .get_document_ids(1, Collection::Thread) + .await + .unwrap() + .unwrap_or_default() + { + batch.delete_document(thread_id); + } + server.store.write(batch.build_batch()).await.unwrap(); + server.store.assert_is_empty().await; } diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index 8ac8ad70..dbfc6350 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -186,7 +186,7 @@ pub async fn jmap_tests() { email_copy::test(params.server.clone(), &mut params.client).await; thread_get::test(params.server.clone(), &mut params.client).await; thread_merge::test(params.server.clone(), &mut params.client).await; - mailbox::test(params.server.clone(), &mut params.client).await;*/ + mailbox::test(params.server.clone(), &mut params.client).await; delivery::test(params.server.clone(), &mut params.client).await; auth_acl::test(params.server.clone(), &mut params.client).await; auth_limits::test(params.server.clone(), &mut params.client).await; @@ -196,7 +196,7 @@ pub async fn jmap_tests() { sieve_script::test(params.server.clone(), &mut params.client).await; vacation_response::test(params.server.clone(), &mut params.client).await; email_submission::test(params.server.clone(), &mut params.client).await; - websocket::test(params.server.clone(), &mut params.client).await; + websocket::test(params.server.clone(), &mut params.client).await;*/ stress_test::test(params.server.clone(), params.client).await; if delete { @@ -257,6 +257,10 @@ async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest { ); } + if delete_if_exists { + jmap.store.destroy().await; + } + // Create client let mut client = Client::new() .credentials(Credentials::basic("admin", "secret")) diff --git a/tests/src/jmap/push_subscription.rs b/tests/src/jmap/push_subscription.rs index c9fbe214..8013c67c 100644 --- a/tests/src/jmap/push_subscription.rs +++ b/tests/src/jmap/push_subscription.rs @@ -175,7 +175,7 @@ pub async fn test(server: Arc, admin_client: &mut Client) { //expect_nothing(&mut event_rx).await; // Multiple change updates should be grouped and pushed in intervals - for num in 0..25 { + for num in 0..5 { client .mailbox_update_sort_order(&mailbox_id, num) .await diff --git a/tests/src/store/assign_id.rs b/tests/src/store/assign_id.rs index 758c213b..e904acd1 100644 --- a/tests/src/store/assign_id.rs +++ b/tests/src/store/assign_id.rs @@ -7,10 +7,16 @@ use store::{write::BatchBuilder, Store}; pub async fn test(db: Arc) { println!("Running Store ID assignment tests..."); + store::backend::foundationdb::write::ID_ASSIGNMENT_EXPIRY + .store(2, std::sync::atomic::Ordering::Relaxed); + test_1(db.clone()).await; test_2(db.clone()).await; test_3(db.clone()).await; test_4(db).await; + + store::backend::foundationdb::write::ID_ASSIGNMENT_EXPIRY + .store(60 * 60, std::sync::atomic::Ordering::Relaxed); } async fn test_1(db: Arc) { diff --git a/tests/src/store/mod.rs b/tests/src/store/mod.rs index eb7c3423..0397d535 100644 --- a/tests/src/store/mod.rs +++ b/tests/src/store/mod.rs @@ -30,8 +30,8 @@ pub async fn store_tests() { if insert { db.destroy().await; } - assign_id::test(db).await; - //query::test(db, insert).await; + //assign_id::test(db).await; + query::test(db, insert).await; temp_dir.delete(); }