Mailbox implementation before testing

This commit is contained in:
Mauro D 2023-05-02 15:51:26 +00:00
parent 9360de22b7
commit a3a0396772
43 changed files with 3642 additions and 457 deletions

View file

@ -77,7 +77,7 @@ impl JsonObjectParser for ChangesRequest {
while let Some(key) = parser.next_dict_key::<RequestProperty>()? {
match &key.hash[0] {
0x6449_746e_756f_6363_61 => {
0x0064_4974_6e75_6f63_6361 => {
request.account_id = parser.next_token::<Id>()?.unwrap_string("accountId")?;
}
0x6574_6174_5365_636e_6973 => {

View file

@ -79,10 +79,10 @@ impl JsonObjectParser for GetRequest<RequestArguments> {
while let Some(key) = parser.next_dict_key::<RequestProperty>()? {
match &key.hash[0] {
0x6449_746e_756f_6363_61 if !key.is_ref => {
0x0064_4974_6e75_6f63_6361 if !key.is_ref => {
request.account_id = parser.next_token::<Id>()?.unwrap_string("accountId")?;
}
0x7364_69 => {
0x0073_6469 => {
request.ids = if !key.is_ref {
<Option<Vec<Id>>>::parse(parser)?.map(MaybeReference::Value)
} else {
@ -138,13 +138,16 @@ impl GetRequest<RequestArguments> {
}
impl<T> GetRequest<T> {
pub fn unwrap_properties(&mut self) -> Option<Vec<Property>> {
let mut properties = self.properties.take()?.unwrap();
// Add Id Property
if !properties.contains(&Property::Id) {
properties.push(Property::Id);
pub fn unwrap_properties(&mut self, default: &[Property]) -> Vec<Property> {
if let Some(mut properties) = self.properties.take().map(|p| p.unwrap()) {
// Add Id Property
if !properties.contains(&Property::Id) {
properties.push(Property::Id);
}
properties
} else {
default.to_vec()
}
Some(properties)
}
pub fn unwrap_ids(

View file

@ -29,13 +29,13 @@ use crate::{
use super::ahash_is_empty;
#[derive(Debug, Clone)]
pub struct SetRequest {
pub struct SetRequest<T> {
pub account_id: Id,
pub if_in_state: Option<State>,
pub create: Option<VecMap<String, Object<SetValue>>>,
pub update: Option<VecMap<Id, Object<SetValue>>>,
pub destroy: Option<MaybeReference<Vec<Id>, ResultReference>>,
pub arguments: RequestArguments,
pub arguments: T,
}
#[derive(Debug, Clone)]
@ -89,7 +89,7 @@ pub struct SetResponse {
pub not_destroyed: VecMap<Id, SetError>,
}
impl JsonObjectParser for SetRequest {
impl JsonObjectParser for SetRequest<RequestArguments> {
fn parse(parser: &mut Parser) -> crate::parser::Result<Self>
where
Self: Sized,
@ -346,7 +346,7 @@ impl RequestPropertyParser for RequestArguments {
}
}
impl SetRequest {
impl<T> SetRequest<T> {
pub fn validate(&self, max_objects_in_set: usize) -> Result<(), MethodError> {
if self.create.as_ref().map_or(0, |objs| objs.len())
+ self.update.as_ref().map_or(0, |objs| objs.len())
@ -365,6 +365,14 @@ impl SetRequest {
}
}
pub fn has_updates(&self) -> bool {
self.update.as_ref().map_or(false, |objs| !objs.is_empty())
}
pub fn has_creates(&self) -> bool {
self.create.as_ref().map_or(false, |objs| !objs.is_empty())
}
pub fn unwrap_create(&mut self) -> VecMap<String, Object<SetValue>> {
self.create.take().unwrap_or_default()
}
@ -381,7 +389,31 @@ impl SetRequest {
}
}
impl SetRequest<RequestArguments> {
pub fn take_arguments(&mut self) -> RequestArguments {
std::mem::replace(&mut self.arguments, RequestArguments::Principal)
}
pub fn with_arguments<T>(self, arguments: T) -> SetRequest<T> {
SetRequest {
account_id: self.account_id,
if_in_state: self.if_in_state,
create: self.create,
update: self.update,
destroy: self.destroy,
arguments,
}
}
}
impl SetResponse {
pub fn created(&mut self, id: String, document_id: u32) {
self.created.insert(
id,
Object::with_capacity(1).with_property(Property::Id, Value::Id(document_id.into())),
);
}
pub fn invalid_property_create(&mut self, id: String, property: impl Into<InvalidProperty>) {
self.not_created.append(
id,

View file

@ -0,0 +1,512 @@
use std::{borrow::Cow, collections::HashSet};
use store::{
fts::builder::ToTokens,
write::{BatchBuilder, IntoOperations, Operation},
Serialize, BM_TAG, HASH_EXACT,
};
use crate::{
error::set::SetError,
types::{id::Id, property::Property, value::Value},
};
use super::Object;
#[derive(Debug, Clone, Default)]
pub struct ObjectIndexBuilder {
index: &'static [IndexProperty],
current: Option<Object<Value>>,
changes: Option<Object<Value>>,
}
#[derive(Debug, Clone, Copy, Default)]
pub enum IndexAs {
Text {
tokenize: bool,
index: bool,
},
TextList {
tokenize: bool,
index: bool,
},
Integer,
IntegerList,
LongInteger,
HasProperty,
#[default]
None,
}
#[derive(Debug, Clone)]
pub struct IndexProperty {
property: Property,
index_as: IndexAs,
required: bool,
max_size: usize,
}
impl ObjectIndexBuilder {
pub fn new(index: &'static [IndexProperty]) -> Self {
Self {
index,
current: None,
changes: None,
}
}
pub fn with_current(mut self, current: Object<Value>) -> Self {
self.current = Some(current);
self
}
pub fn with_changes(mut self, changes: Object<Value>) -> Self {
self.changes = Some(changes);
self
}
pub fn with_current_opt(mut self, current: Option<Object<Value>>) -> Self {
self.current = current;
self
}
pub fn validate(self) -> Result<Self, SetError> {
for item in self.index {
if item.required || item.max_size > 0 {
let value = self
.changes
.as_ref()
.and_then(|c| c.properties.get(&item.property))
.or_else(|| {
self.current
.as_ref()
.and_then(|c| c.properties.get(&item.property))
});
let error: Cow<str> = match value {
None if item.required => "Property cannot be empty.".into(),
Some(Value::Text(text)) => {
if item.required && text.trim().is_empty() {
"Property cannot be empty.".into()
} else if item.max_size > 0 && text.len() > item.max_size {
format!("Property cannot be longer than {} bytes.", item.max_size)
.into()
} else {
continue;
}
}
_ => continue,
};
return Err(SetError::invalid_properties()
.with_property(item.property.clone())
.with_description(error));
}
}
Ok(self)
}
}
impl IntoOperations for ObjectIndexBuilder {
fn build(self, batch: &mut BatchBuilder) {
match (self.current, self.changes) {
(None, Some(changes)) => {
// Insertion
build_batch(batch, self.index, &changes, true);
batch.ops.push(Operation::Value {
field: Property::Value.into(),
family: 0,
set: changes.serialize().into(),
});
}
(Some(current), Some(changes)) => {
// Update
merge_batch(batch, self.index, current, changes);
}
(Some(current), None) => {
// Deletion
build_batch(batch, self.index, &current, true);
batch.ops.push(Operation::Value {
field: Property::Value.into(),
family: 0,
set: None,
});
}
(None, None) => unreachable!(),
}
}
}
fn merge_batch(
batch: &mut BatchBuilder,
index: &'static [IndexProperty],
mut current: Object<Value>,
changes: Object<Value>,
) {
let mut has_changes = false;
for (property, value) in changes.properties {
let current_value = current.get(&property);
if current_value == &value {
continue;
}
match index
.iter()
.find_map(|i| {
if i.property == property {
Some(i.index_as)
} else {
None
}
})
.unwrap_or_default()
{
IndexAs::Text { tokenize, index } => {
// Remove current text from index
let mut add_tokens = HashSet::new();
let mut remove_tokens = HashSet::new();
if let Some(text) = current_value.as_string() {
if index {
batch.ops.push(Operation::Index {
field: property.clone().into(),
key: text.serialize(),
set: false,
});
}
if tokenize {
remove_tokens = text.to_tokens();
}
}
// Add new text to index
if let Some(text) = value.as_string() {
if index {
batch.ops.push(Operation::Index {
field: property.clone().into(),
key: text.serialize(),
set: true,
});
}
if tokenize {
for token in text.to_tokens() {
if !remove_tokens.remove(&token) {
add_tokens.insert(token);
}
}
}
}
// Update tokens
for (token, set) in [(add_tokens, true), (remove_tokens, false)] {
for token in token {
batch.ops.push(Operation::hash(
&token,
HASH_EXACT,
property.clone().into(),
set,
));
}
}
}
IndexAs::TextList { tokenize, index } => {
let mut add_tokens = HashSet::new();
let mut remove_tokens = HashSet::new();
let mut add_values = HashSet::new();
let mut remove_values = HashSet::new();
// Remove current text from index
if let Some(current_values) = current_value.as_list() {
for current_value in current_values {
if let Some(text) = current_value.as_string() {
if index {
remove_values.insert(text);
}
if tokenize {
remove_tokens.extend(text.to_tokens());
}
}
}
}
// Add new text to index
if let Some(values) = value.as_list() {
for value in values {
if let Some(text) = value.as_string() {
if index && !remove_values.remove(text) {
add_values.insert(text);
}
if tokenize {
for token in text.to_tokens() {
if !remove_tokens.remove(&token) {
add_tokens.insert(token);
}
}
}
}
}
}
// Update index
for (values, set) in [(add_values, true), (remove_values, false)] {
for value in values {
batch.ops.push(Operation::Index {
field: property.clone().into(),
key: value.serialize(),
set,
});
}
}
// Update tokens
for (token, set) in [(add_tokens, true), (remove_tokens, false)] {
for token in token {
batch.ops.push(Operation::hash(
&token,
HASH_EXACT,
property.clone().into(),
set,
));
}
}
}
index_as @ (IndexAs::Integer | IndexAs::LongInteger) => {
if let Some(current_value) = current_value.try_cast_uint() {
batch.ops.push(Operation::Index {
field: property.clone().into(),
key: current_value.into_index(index_as),
set: false,
});
}
if let Some(value) = value.try_cast_uint() {
batch.ops.push(Operation::Index {
field: property.clone().into(),
key: value.into_index(index_as),
set: false,
});
}
}
IndexAs::IntegerList => {
let mut add_values = HashSet::new();
let mut remove_values = HashSet::new();
if let Some(current_values) = current_value.as_list() {
for current_value in current_values {
if let Some(current_value) = current_value.try_cast_uint() {
remove_values.insert(current_value);
}
}
}
if let Some(values) = value.as_list() {
for value in values {
if let Some(value) = value.try_cast_uint() {
if !remove_values.remove(&value) {
add_values.insert(value);
}
}
}
}
for (values, set) in [(add_values, true), (remove_values, false)] {
for value in values {
batch.ops.push(Operation::Index {
field: property.clone().into(),
key: (value as u32).serialize(),
set,
});
}
}
}
IndexAs::HasProperty => {
if current_value == &Value::Null {
batch.ops.push(Operation::Bitmap {
family: BM_TAG,
field: property.clone().into(),
key: vec![],
set: true,
});
} else if value == Value::Null {
batch.ops.push(Operation::Bitmap {
family: BM_TAG,
field: property.clone().into(),
key: vec![],
set: false,
});
}
}
IndexAs::None => (),
}
if value != Value::Null {
current.set(property, value);
} else {
current.remove(&property);
}
has_changes = true;
}
if has_changes {
batch.ops.push(Operation::Value {
field: Property::Value.into(),
family: 0,
set: current.serialize().into(),
});
}
}
fn build_batch(
batch: &mut BatchBuilder,
index: &'static [IndexProperty],
object: &Object<Value>,
set: bool,
) {
for item in index {
match (object.get(&item.property), item.index_as) {
(Value::Text(text), IndexAs::Text { tokenize, index }) => {
if index {
batch.ops.push(Operation::Index {
field: (&item.property).into(),
key: text.serialize(),
set,
});
}
if tokenize {
for token in text.to_tokens() {
batch.ops.push(Operation::hash(
&token,
HASH_EXACT,
(&item.property).into(),
true,
));
}
}
}
(Value::List(values), IndexAs::TextList { tokenize, index }) => {
let mut tokens = HashSet::new();
let mut indexes = HashSet::new();
for value in values {
if let Some(text) = value.as_string() {
if index {
indexes.insert(text);
}
if tokenize {
tokens.extend(text.to_tokens());
}
}
}
for text in indexes {
batch.ops.push(Operation::Index {
field: (&item.property).into(),
key: text.serialize(),
set,
});
}
for token in tokens {
batch.ops.push(Operation::hash(
&token,
HASH_EXACT,
(&item.property).into(),
true,
));
}
}
(Value::UnsignedInt(integer), IndexAs::Integer | IndexAs::LongInteger) => {
batch.ops.push(Operation::Index {
field: (&item.property).into(),
key: integer.into_index(item.index_as),
set,
});
}
(Value::Id(id), IndexAs::Integer | IndexAs::LongInteger) => {
batch.ops.push(Operation::Index {
field: (&item.property).into(),
key: id.into_index(item.index_as),
set,
});
}
(Value::List(values), IndexAs::IntegerList) => {
for value in values
.iter()
.map(|value| match value {
Value::UnsignedInt(integer) => *integer as u32,
Value::Id(id) => id.document_id(),
_ => unreachable!(),
})
.collect::<HashSet<_>>()
{
batch.ops.push(Operation::Index {
field: (&item.property).into(),
key: value.into_index(item.index_as),
set,
});
}
}
(value, IndexAs::HasProperty) if value != &Value::Null => {
batch.ops.push(Operation::Bitmap {
family: BM_TAG,
field: (&item.property).into(),
key: vec![],
set,
});
}
_ => (),
}
}
}
impl IndexProperty {
pub const fn new(property: Property) -> Self {
Self {
property,
required: false,
max_size: 0,
index_as: IndexAs::None,
}
}
pub const fn required(mut self) -> Self {
self.required = true;
self
}
pub const fn max_size(mut self, max_size: usize) -> Self {
self.max_size = max_size;
self
}
pub const fn index_as(mut self, index_as: IndexAs) -> Self {
self.index_as = index_as;
self
}
}
trait IntoIndex {
fn into_index(self, index_as: IndexAs) -> Vec<u8>;
}
impl IntoIndex for &u64 {
fn into_index(self, index_as: IndexAs) -> Vec<u8> {
match index_as {
IndexAs::Integer => (*self as u32).serialize(),
IndexAs::LongInteger => self.serialize(),
_ => unreachable!(),
}
}
}
impl IntoIndex for &u32 {
fn into_index(self, index_as: IndexAs) -> Vec<u8> {
match index_as {
IndexAs::Integer => self.serialize(),
_ => unreachable!(),
}
}
}
impl IntoIndex for &Id {
fn into_index(self, index_as: IndexAs) -> Vec<u8> {
match index_as {
IndexAs::Integer => self.document_id().serialize(),
IndexAs::LongInteger => self.id().serialize(),
_ => unreachable!(),
}
}
}

View file

@ -10,8 +10,8 @@ pub struct SetArguments {
#[derive(Debug, Clone, Default)]
pub struct QueryArguments {
sort_as_tree: Option<bool>,
filter_as_tree: Option<bool>,
pub sort_as_tree: Option<bool>,
pub filter_as_tree: Option<bool>,
}
impl RequestPropertyParser for SetArguments {
@ -21,7 +21,7 @@ impl RequestPropertyParser for SetArguments {
property: RequestProperty,
) -> crate::parser::Result<bool> {
if property.hash[0] == 0x4565_766f_6d65_5279_6f72_7473_6544_6e6f
&& property.hash[1] == 0x736c_6961_6d
&& property.hash[1] == 0x0073_6c69_616d
{
self.on_destroy_remove_emails = parser
.next_token::<Ignore>()?

View file

@ -1,5 +1,6 @@
pub mod email;
pub mod email_submission;
pub mod index;
pub mod mailbox;
pub mod sieve;

View file

@ -20,7 +20,7 @@ use crate::{
query::{self, QueryRequest},
query_changes::QueryChangesRequest,
search_snippet::GetSearchSnippetRequest,
set::SetRequest,
set::{self, SetRequest},
validate::ValidateSieveScriptRequest,
},
parser::{json::Parser, JsonObjectParser},
@ -52,7 +52,7 @@ pub struct RequestProperty {
#[derive(Debug)]
pub enum RequestMethod {
Get(GetRequest<get::RequestArguments>),
Set(SetRequest),
Set(SetRequest<set::RequestArguments>),
Changes(ChangesRequest),
Copy(CopyRequest),
CopyBlob(CopyBlobRequest),

View file

@ -50,7 +50,7 @@ pub struct Response {
#[serde(serialize_with = "serialize_hex")]
pub session_state: u32,
#[serde(rename(deserialize = "createdIds"))]
#[serde(rename = "createdIds")]
pub created_ids: HashMap<String, Id>,
}

View file

@ -3,7 +3,7 @@ use std::collections::HashMap;
use utils::map::vec_map::VecMap;
use crate::{
error::method::MethodError,
error::{method::MethodError, set::SetError},
object::Object,
request::{
reference::{MaybeReference, ResultReference},
@ -54,6 +54,17 @@ impl Response {
// Perform topological sort
if !graph.is_empty() {
// Make sure all references exist
for (from_id, to_ids) in graph.iter() {
for to_id in to_ids {
if !create.contains_key(to_id) {
return Err(MethodError::InvalidResultReference(format!(
"Invalid reference to non-existing object {to_id:?} from {from_id:?}"
)));
}
}
}
let mut sorted_create = VecMap::with_capacity(create.len());
let mut it_stack = Vec::new();
let keys = graph.keys().cloned().collect::<Vec<_>>();
@ -298,7 +309,10 @@ impl Object<SetValue> {
pub fn iterate_and_eval_references(
self,
response: &Response,
) -> impl Iterator<Item = Result<(Property, MaybePatchValue), MethodError>> + '_ {
) -> impl Iterator<Item = Result<(Property, MaybePatchValue), SetError>> + '_ {
// Resolve id references, which were previously validated.
// If the ID is not found it means that set failed for that id, so we return a setError
// instead of failing the entire request with a MethodError::InvalidResultReference.
self.properties
.into_iter()
.map(|(property, set_value)| match set_value {
@ -308,9 +322,8 @@ impl Object<SetValue> {
if let Some(id) = response.created_ids.get(&id_ref) {
Ok((property, MaybePatchValue::Value(Value::Id(*id))))
} else {
Err(MethodError::InvalidResultReference(format!(
"Id reference {id_ref:?} not found."
)))
Err(SetError::not_found()
.with_description(format!("Id reference {id_ref:?} not found.")))
}
}
SetValue::IdReference(MaybeReference::Value(id)) => {
@ -327,7 +340,7 @@ impl Object<SetValue> {
if let Some(id) = response.created_ids.get(&id_ref) {
ids.push(Value::Id(*id));
} else {
return Err(MethodError::InvalidResultReference(format!(
return Err(SetError::not_found().with_description(format!(
"Id reference {id_ref:?} not found."
)));
}

View file

@ -57,6 +57,7 @@ impl JsonObjectParser for Keyword {
where
Self: Sized,
{
let pos = parser.pos;
if parser
.next_unescaped()?
.ok_or_else(|| parser.error_value())?
@ -93,8 +94,7 @@ impl JsonObjectParser for Keyword {
if parser.is_eof || parser.skip_string() {
Ok(Keyword::Other(
String::from_utf8_lossy(parser.bytes[parser.pos_marker..parser.pos - 1].as_ref())
.into_owned(),
String::from_utf8_lossy(parser.bytes[pos..parser.pos - 1].as_ref()).into_owned(),
))
} else {
Err(parser.error_unterminated())

View file

@ -142,7 +142,7 @@ impl JsonObjectParser for Property {
} else {
first_char = ch;
}
} else if ch == b':' && first_char == b'h' && hash == 0x7265_6461_65 {
} else if ch == b':' && first_char == b'h' && hash == 0x0072_6564_6165 {
return parse_header_property(parser);
} else {
return parser.invalid_property();
@ -188,7 +188,7 @@ impl JsonObjectParser for SetProperty {
is_patch = true;
break;
}
b':' if first_char == b'h' && hash == 0x7265_6461_65 && !is_ref => {
b':' if first_char == b'h' && hash == 0x0072_6564_6165 && !is_ref => {
return parse_header_property(parser).map(|property| SetProperty {
property,
patch: vec![],
@ -534,7 +534,7 @@ impl JsonObjectParser for ObjectProperty {
} else {
first_char = ch;
}
} else if ch == b':' && first_char == b'h' && hash == 0x7265_6461_65 {
} else if ch == b':' && first_char == b'h' && hash == 0x0072_6564_6165 {
return parse_header_property(parser).map(ObjectProperty);
} else {
return parser.invalid_property().map(ObjectProperty);
@ -544,11 +544,11 @@ impl JsonObjectParser for ObjectProperty {
Ok(ObjectProperty(match first_char {
b'a' => match hash {
0x7365_7373_6572_6464 => Property::Addresses,
0x6874_75 => Property::Auth,
0x0068_7475 => Property::Auth,
_ => parser.invalid_property()?,
},
b'b' => match hash {
0x6449_626f_6c => Property::BlobId,
0x0064_4962_6f6c => Property::BlobId,
_ => parser.invalid_property()?,
},
b'c' => match hash {
@ -571,7 +571,7 @@ impl JsonObjectParser for ObjectProperty {
_ => parser.invalid_property()?,
},
b'i' => match hash {
0x656c_626f_7250_676e_6964_6f63_6e45_73 => Property::IsEncodingProblem,
0x0065_6c62_6f72_5067_6e69_646f_636e_4573 => Property::IsEncodingProblem,
0x6465_7461_636e_7572_5473 => Property::IsTruncated,
_ => parser.invalid_property()?,
},

View file

@ -220,6 +220,13 @@ impl Value {
}
}
pub fn try_unwrap_id(self) -> Option<Id> {
match self {
Value::Id(i) => Some(i),
_ => None,
}
}
pub fn try_unwrap_blob_id(self) -> Option<BlobId> {
match self {
Value::BlobId(b) => Some(b),
@ -233,6 +240,35 @@ impl Value {
_ => None,
}
}
pub fn as_string(&self) -> Option<&str> {
match self {
Value::Text(s) => Some(s),
_ => None,
}
}
pub fn as_id(&self) -> Option<&Id> {
match self {
Value::Id(id) => Some(id),
_ => None,
}
}
pub fn as_list(&self) -> Option<&Vec<Value>> {
match self {
Value::List(l) => Some(l),
_ => None,
}
}
pub fn try_cast_uint(&self) -> Option<u64> {
match self {
Value::UnsignedInt(u) => Some(*u),
Value::Id(id) => Some(id.id()),
_ => None,
}
}
}
impl<T: JsonObjectParser + Display + Eq> JsonObjectParser for SetValueMap<T> {

View file

@ -12,6 +12,9 @@ impl crate::Config {
query_max_results: settings
.property("jmap.protocol.query.max-results")?
.unwrap_or(5000),
changes_max_results: settings
.property("jmap.protocol.changes.max-results")?
.unwrap_or(5000),
request_max_size: settings
.property("jmap.protocol.request.max-size")?
.unwrap_or(10000000),

View file

@ -1,10 +1,7 @@
use jmap_proto::{
error::{method::MethodError, request::RequestError},
method::{get, query},
request::{
method::{MethodName, MethodObject},
Request, RequestMethod,
},
error::request::RequestError,
method::{get, query, set},
request::{method::MethodName, Request, RequestMethod},
response::{Response, ResponseMethod},
};
@ -34,7 +31,7 @@ impl JMAP {
get::RequestArguments::Email(arguments) => {
self.email_get(req.with_arguments(arguments)).await.into()
}
get::RequestArguments::Mailbox => todo!(),
get::RequestArguments::Mailbox => self.mailbox_get(req).await.into(),
get::RequestArguments::Thread => self.thread_get(req).await.into(),
get::RequestArguments::Identity => todo!(),
get::RequestArguments::EmailSubmission => todo!(),
@ -47,14 +44,26 @@ impl JMAP {
query::RequestArguments::Email(arguments) => {
self.email_query(req.with_arguments(arguments)).await.into()
}
query::RequestArguments::Mailbox(_) => todo!(),
query::RequestArguments::Mailbox(arguments) => self
.mailbox_query(req.with_arguments(arguments))
.await
.into(),
query::RequestArguments::EmailSubmission => todo!(),
query::RequestArguments::SieveScript => todo!(),
query::RequestArguments::Principal => todo!(),
},
RequestMethod::Set(req) => match call.name.obj {
MethodObject::Email => self.email_set(req, &response).await.into(),
_ => MethodError::UnknownMethod(format!("{}/set", call.name.obj)).into(),
RequestMethod::Set(mut req) => match req.take_arguments() {
set::RequestArguments::Email => self.email_set(req, &response).await.into(),
set::RequestArguments::Mailbox(arguments) => self
.mailbox_set(req.with_arguments(arguments), &response)
.await
.into(),
set::RequestArguments::Identity => todo!(),
set::RequestArguments::EmailSubmission(_) => todo!(),
set::RequestArguments::PushSubscription => todo!(),
set::RequestArguments::SieveScript(_) => todo!(),
set::RequestArguments::VacationResponse => todo!(),
set::RequestArguments::Principal => todo!(),
},
RequestMethod::Changes(_) => todo!(),
RequestMethod::Copy(_) => todo!(),

View file

@ -0,0 +1,164 @@
use jmap_proto::{
error::method::MethodError,
method::changes::{ChangesRequest, ChangesResponse, RequestArguments},
types::{collection::Collection, property::Property, state::State},
};
use store::query::log::{Change, Changes, Query};
use crate::JMAP;
impl JMAP {
pub async fn changes(&self, request: ChangesRequest) -> Result<ChangesResponse, MethodError> {
let collection = match request.arguments {
RequestArguments::Email => Collection::Email,
RequestArguments::Mailbox => Collection::Mailbox,
RequestArguments::Thread => Collection::Thread,
RequestArguments::Identity => Collection::Identity,
RequestArguments::EmailSubmission => Collection::EmailSubmission,
};
let max_changes = if self.config.changes_max_results > 0
&& self.config.changes_max_results < request.max_changes.unwrap_or(0)
{
self.config.changes_max_results
} else {
request.max_changes.unwrap_or(0)
};
let mut response = ChangesResponse {
account_id: request.account_id,
old_state: State::Initial,
new_state: State::Initial,
has_more_changes: false,
created: vec![],
updated: vec![],
destroyed: vec![],
updated_properties: None,
};
let account_id = request.account_id.document_id();
let (items_sent, mut changelog) = match &request.since_state {
State::Initial => {
let changelog = self
.query_changes(account_id, collection, Query::All)
.await?
.unwrap();
if changelog.changes.is_empty() && changelog.from_change_id == 0 {
return Ok(response);
}
(0, changelog)
}
State::Exact(change_id) => (
0,
self.query_changes(account_id, collection, Query::Since(*change_id))
.await?
.ok_or_else(|| {
MethodError::InvalidArguments(
"The specified stateId does could not be found.".to_string(),
)
})?,
),
State::Intermediate(intermediate_state) => {
let mut changelog = self
.query_changes(
account_id,
collection,
Query::RangeInclusive(intermediate_state.from_id, intermediate_state.to_id),
)
.await?
.ok_or_else(|| {
MethodError::InvalidArguments(
"The specified stateId does could not be found.".to_string(),
)
})?;
if intermediate_state.items_sent >= changelog.changes.len() {
(
0,
self.query_changes(
account_id,
collection,
Query::Since(intermediate_state.to_id),
)
.await?
.ok_or_else(|| {
MethodError::InvalidArguments(
"The specified stateId does could not be found.".to_string(),
)
})?,
)
} else {
changelog.changes.drain(
(changelog.changes.len() - intermediate_state.items_sent)
..changelog.changes.len(),
);
(intermediate_state.items_sent, changelog)
}
}
};
if max_changes > 0 && changelog.changes.len() > max_changes {
changelog
.changes
.drain(0..(changelog.changes.len() - max_changes));
response.has_more_changes = true;
};
let mut items_changed = false;
let total_changes = changelog.changes.len();
if total_changes > 0 {
for change in changelog.changes {
match change {
Change::Insert(item) => response.created.push(item.into()),
Change::Update(item) => {
items_changed = true;
response.updated.push(item.into())
}
Change::Delete(item) => response.destroyed.push(item.into()),
Change::ChildUpdate(item) => response.updated.push(item.into()),
};
}
}
response.new_state = if response.has_more_changes {
State::new_intermediate(
changelog.from_change_id,
changelog.to_change_id,
items_sent + max_changes,
)
} else {
State::new_exact(changelog.to_change_id)
};
if !response.updated.is_empty() && !items_changed && collection == Collection::Mailbox {
response.updated_properties = vec![
Property::TotalEmails,
Property::UnreadEmails,
Property::TotalThreads,
Property::UnreadThreads,
]
.into()
}
Ok(response)
}
async fn query_changes(
&self,
account_id: u32,
collection: Collection,
query: Query,
) -> Result<Option<Changes>, MethodError> {
self.store
.changes(account_id, collection, query)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "changes",
account_id = account_id,
collection = ?collection,
error = ?err,
"Failed to query changes.");
MethodError::ServerPartialFail
})
}
}

View file

@ -1,43 +1,3 @@
use jmap_proto::{
error::method::MethodError,
types::{collection::Collection, state::State},
};
use crate::JMAP;
impl JMAP {
pub async fn get_state(
&self,
account_id: u32,
collection: Collection,
) -> Result<State, MethodError> {
match self.store.get_last_change_id(account_id, collection).await {
Ok(id) => Ok(id.into()),
Err(err) => {
tracing::error!(event = "error",
context = "store",
account_id = account_id,
collection = ?collection,
error = ?err,
"Failed to obtain state");
Err(MethodError::ServerPartialFail)
}
}
}
pub async fn assert_state(
&self,
account_id: u32,
collection: Collection,
if_in_state: &Option<State>,
) -> Result<State, MethodError> {
let old_state: State = self.get_state(account_id, collection).await?;
if let Some(if_in_state) = if_in_state {
if &old_state != if_in_state {
return Err(MethodError::StateMismatch);
}
}
Ok(old_state)
}
}
pub mod get;
pub mod state;
pub mod write;

View file

@ -0,0 +1,43 @@
use jmap_proto::{
error::method::MethodError,
types::{collection::Collection, state::State},
};
use crate::JMAP;
impl JMAP {
pub async fn get_state(
&self,
account_id: u32,
collection: Collection,
) -> Result<State, MethodError> {
match self.store.get_last_change_id(account_id, collection).await {
Ok(id) => Ok(id.into()),
Err(err) => {
tracing::error!(event = "error",
context = "store",
account_id = account_id,
collection = ?collection,
error = ?err,
"Failed to obtain state");
Err(MethodError::ServerPartialFail)
}
}
}
pub async fn assert_state(
&self,
account_id: u32,
collection: Collection,
if_in_state: &Option<State>,
) -> Result<State, MethodError> {
let old_state: State = self.get_state(account_id, collection).await?;
if let Some(if_in_state) = if_in_state {
if &old_state != if_in_state {
return Err(MethodError::StateMismatch);
}
}
Ok(old_state)
}
}

View file

@ -0,0 +1,56 @@
use jmap_proto::{error::method::MethodError, types::state::State};
use store::write::{log::ChangeLogBuilder, BatchBuilder};
use crate::JMAP;
impl JMAP {
pub async fn begin_changes(&self, account_id: u32) -> Result<ChangeLogBuilder, MethodError> {
Ok(ChangeLogBuilder::with_change_id(
self.store
.assign_change_id(account_id)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "change_log",
error = ?err,
"Failed to assign changeId.");
MethodError::ServerPartialFail
})?,
))
}
pub async fn commit_changes(
&self,
account_id: u32,
mut changes: ChangeLogBuilder,
) -> Result<State, MethodError> {
if changes.change_id == u64::MAX {
changes.change_id = self
.store
.assign_change_id(account_id)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "change_log",
error = ?err,
"Failed to assign changeId.");
MethodError::ServerPartialFail
})?;
}
let state = State::from(changes.change_id);
let mut builder = BatchBuilder::new();
builder.with_account_id(account_id).custom(changes);
self.store.write(builder.build()).await.map_err(|err| {
tracing::error!(
event = "error",
context = "change_log",
error = ?err,
"Failed to write changes.");
MethodError::ServerPartialFail
})?;
Ok(state)
}
}

View file

@ -19,34 +19,32 @@ impl JMAP {
mut request: GetRequest<GetArguments>,
) -> Result<GetResponse, MethodError> {
let ids = request.unwrap_ids(self.config.get_max_objects)?;
let properties = request.unwrap_properties().unwrap_or_else(|| {
vec![
Property::Id,
Property::BlobId,
Property::ThreadId,
Property::MailboxIds,
Property::Keywords,
Property::Size,
Property::ReceivedAt,
Property::MessageId,
Property::InReplyTo,
Property::References,
Property::Sender,
Property::From,
Property::To,
Property::Cc,
Property::Bcc,
Property::ReplyTo,
Property::Subject,
Property::SentAt,
Property::HasAttachment,
Property::Preview,
Property::BodyValues,
Property::TextBody,
Property::HtmlBody,
Property::Attachments,
]
});
let properties = request.unwrap_properties(&[
Property::Id,
Property::BlobId,
Property::ThreadId,
Property::MailboxIds,
Property::Keywords,
Property::Size,
Property::ReceivedAt,
Property::MessageId,
Property::InReplyTo,
Property::References,
Property::Sender,
Property::From,
Property::To,
Property::Cc,
Property::Bcc,
Property::ReplyTo,
Property::Subject,
Property::SentAt,
Property::HasAttachment,
Property::Preview,
Property::BodyValues,
Property::TextBody,
Property::HtmlBody,
Property::Attachments,
]);
let body_properties = request.arguments.body_properties.unwrap_or_else(|| {
vec![
Property::PartId,
@ -80,7 +78,7 @@ impl JMAP {
self.get_properties::<u32>(
account_id,
Collection::Email,
&document_ids,
document_ids.iter().copied(),
Property::ThreadId,
)
.await?

View file

@ -7,7 +7,7 @@ use jmap_proto::{
use mail_parser::{HeaderName, RfcHeader};
use store::{
fts::{builder::MAX_TOKEN_LENGTH, Language},
query::{self, sort::Pagination},
query::{self},
roaring::RoaringBitmap,
ValueKey,
};
@ -17,12 +17,12 @@ use crate::JMAP;
impl JMAP {
pub async fn email_query(
&self,
request: QueryRequest<QueryArguments>,
mut request: QueryRequest<QueryArguments>,
) -> Result<QueryResponse, MethodError> {
let account_id = request.account_id.document_id();
let mut filters = Vec::with_capacity(request.filter.len());
for cond in request.filter {
for cond in std::mem::take(&mut request.filter) {
match cond {
Filter::InMailbox(mailbox) => filters.push(query::Filter::is_in_bitmap(
Property::MailboxIds,
@ -215,51 +215,11 @@ impl JMAP {
}
}
let result_set = match self
.store
.filter(account_id, Collection::Email, filters)
.await
{
Ok(result_set) => result_set,
Err(err) => {
tracing::error!(event = "error",
context = "store",
account_id = account_id,
collection = "email",
error = ?err,
"Filter failed");
return Err(MethodError::ServerPartialFail);
}
};
let total = result_set.results.len() as usize;
let (limit_total, limit) = if let Some(limit) = request.limit {
if limit > 0 {
let limit = std::cmp::min(limit, self.config.query_max_results);
(std::cmp::min(limit, total), limit)
} else {
(0, 0)
}
} else {
(
std::cmp::min(self.config.query_max_results, total),
self.config.query_max_results,
)
};
let mut response = QueryResponse {
account_id: request.account_id,
query_state: self.get_state(account_id, Collection::Email).await?,
can_calculate_changes: true,
position: 0,
ids: vec![],
total: if request.calculate_total.unwrap_or(false) {
Some(total)
} else {
None
},
limit: if total > limit { Some(limit) } else { None },
};
let (response, result_set, paginate) = self
.query(account_id, Collection::Email, filters, &request)
.await?;
if limit_total > 0 {
if let Some(paginate) = paginate {
// Parse sort criteria
let mut comparators = Vec::with_capacity(request.sort.as_ref().map_or(1, |s| s.len()));
for comparator in request
@ -325,48 +285,23 @@ impl JMAP {
}
// Sort results
let result = match self
.store
.sort(
result_set,
comparators,
Pagination::new(
limit_total,
request.position.unwrap_or(0),
request.anchor.map(|a| a.document_id()),
request.anchor_offset.unwrap_or(0),
ValueKey::new(account_id, Collection::Email, 0, Property::ThreadId).into(),
request.arguments.collapse_threads.unwrap_or(false),
),
)
.await
{
Ok(result) => result,
Err(err) => {
tracing::error!(event = "error",
context = "store",
account_id = account_id,
collection = "email",
error = ?err,
"Sort failed");
return Err(MethodError::ServerPartialFail);
}
};
// Prepare response
if result.found_anchor {
response.position = result.position;
response.ids = result
.ids
.into_iter()
.map(|id| id.into())
.collect::<Vec<_>>();
} else {
return Err(MethodError::AnchorNotFound);
}
self.sort(
result_set,
comparators,
paginate
.with_prefix_key(ValueKey::new(
account_id,
Collection::Email,
0,
Property::ThreadId,
))
.with_prefix_unique(request.arguments.collapse_threads.unwrap_or(false)),
response,
)
.await
} else {
Ok(response)
}
Ok(response)
}
async fn thread_keywords(

View file

@ -5,11 +5,12 @@ use jmap_proto::{
method::MethodError,
set::{SetError, SetErrorType},
},
method::set::{SetRequest, SetResponse},
method::set::{RequestArguments, SetRequest, SetResponse},
object::Object,
response::Response,
types::{
collection::Collection,
id::Id,
keyword::Keyword,
property::Property,
value::{MaybePatchValue, SetValue, Value},
@ -26,11 +27,12 @@ use mail_builder::{
use mail_parser::parsers::fields::thread::thread_name;
use store::{
ahash::AHashSet,
fts::term_index::TokenIndex,
write::{
assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, DeserializeFrom, SerializeInto,
ToBitmaps, F_BITMAP, F_CLEAR, F_INDEX, F_VALUE,
},
BlobKind, Serialize,
BlobKind, Serialize, ValueKey,
};
use crate::JMAP;
@ -43,7 +45,7 @@ use super::{
impl JMAP {
pub async fn email_set(
&self,
mut request: SetRequest,
mut request: SetRequest<RequestArguments>,
response: &Response,
) -> Result<SetResponse, MethodError> {
// Prepare response
@ -61,6 +63,9 @@ impl JMAP {
let remove = "fdf";
mailbox_ids.insert(0);
mailbox_ids.insert(1);
mailbox_ids.insert(2);
mailbox_ids.insert(3);
mailbox_ids.insert(4);
let will_destroy = request.unwrap_destroy();
@ -103,7 +108,14 @@ impl JMAP {
// Parse properties
for item in object.iterate_and_eval_references(response) {
match item? {
let item = match item {
Ok(item) => item,
Err(err) => {
set_response.not_created.append(id, err);
continue 'create;
}
};
match item {
(Property::MailboxIds, MaybePatchValue::Value(Value::List(ids))) => {
mailboxes = ids
.into_iter()
@ -684,6 +696,7 @@ impl JMAP {
}
// Process updates
let mut changes = ChangeLogBuilder::new();
'update: for (id, object) in request.unwrap_update() {
// Make sure id won't be destroyed
if will_destroy.contains(&id) {
@ -707,7 +720,7 @@ impl JMAP {
account_id,
Collection::Email,
document_id,
Property::MailboxIds,
Property::Keywords,
)
.await?,
) {
@ -717,8 +730,21 @@ impl JMAP {
continue 'update;
};
// Prepare write batch
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email);
for item in object.iterate_and_eval_references(response) {
match item? {
let item = match item {
Ok(item) => item,
Err(err) => {
set_response.not_updated.append(id, err);
continue 'update;
}
};
match item {
(Property::MailboxIds, MaybePatchValue::Value(Value::List(ids))) => {
mailboxes.set(
ids.into_iter()
@ -764,28 +790,9 @@ impl JMAP {
continue 'update;
}
// Prepare write batch
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.update_document(document_id);
// Log change
batch.update_document(document_id);
let mut changed_mailboxes = AHashSet::new();
let mut changes = ChangeLogBuilder::with_change_id(
self.store
.assign_change_id(account_id)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "email_set",
error = ?err,
"Failed to assign changeId for email update.");
MethodError::ServerPartialFail
})?,
);
changes.log_update(Collection::Email, id);
// Process keywords
@ -849,19 +856,66 @@ impl JMAP {
}
// Write changes
batch.custom(changes);
self.store.write(batch.build()).await.map_err(|err| {
tracing::error!(
event = "error",
context = "email_set",
error = ?err,
"Failed to write message changes to database.");
MethodError::ServerPartialFail
})?;
if !batch.is_empty() {
match self.store.write(batch.build()).await {
Ok(_) => {
// Add to updated list
set_response.updated.append(id, None);
}
Err(store::Error::AssertValueFailed) => {
set_response.not_updated.append(
id,
SetError::forbidden().with_description(
"Another process modified this message, please try again.",
),
);
}
Err(err) => {
tracing::error!(
event = "error",
context = "email_set",
error = ?err,
"Failed to write message changes to database.");
return Err(MethodError::ServerPartialFail);
}
}
}
}
for destroy_id in will_destroy {
// Todo
// Process deletions
if !will_destroy.is_empty() {
let email_ids = self
.get_document_ids(account_id, Collection::Email)
.await?
.unwrap_or_default();
for destroy_id in will_destroy {
if email_ids.contains(destroy_id.document_id()) {
match self
.email_delete(account_id, destroy_id.document_id())
.await?
{
Ok(change) => {
changes.merge(change);
set_response.destroyed.push(destroy_id);
}
Err(err) => {
set_response
.not_destroyed
.append(destroy_id, SetError::not_found());
}
}
} else {
set_response
.not_destroyed
.append(destroy_id, SetError::not_found());
}
}
}
if !changes.is_empty() {
set_response.new_state = self.commit_changes(account_id, changes).await?.into();
} else if !set_response.created.is_empty() {
set_response.new_state = self.get_state(account_id, Collection::Email).await?.into();
}
Ok(set_response)
@ -871,11 +925,14 @@ impl JMAP {
&self,
account_id: u32,
document_id: u32,
batch: &mut BatchBuilder,
changes: &mut ChangeLogBuilder,
) -> Result<bool, MethodError> {
) -> Result<Result<ChangeLogBuilder, SetError>, MethodError> {
// Create batch
let mut batch = BatchBuilder::new();
let mut changes = ChangeLogBuilder::with_change_id(0);
// Delete document
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.delete_document(document_id);
@ -891,7 +948,14 @@ impl JMAP {
{
mailboxes
} else {
return Ok(false);
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch mailboxIds.",
);
return Ok(Err(SetError::not_found()));
};
for mailbox_id in &mailboxes.inner {
changes.log_child_update(Collection::Mailbox, *mailbox_id);
@ -918,10 +982,18 @@ impl JMAP {
F_VALUE | F_BITMAP | F_CLEAR,
);
} else {
return Ok(false);
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch keywords.",
);
return Ok(Err(SetError::not_found()));
};
// Remove threadIds
let mut delete_thread_id = None;
if let Some(thread_id) = self
.get_property::<u32>(
account_id,
@ -941,10 +1013,7 @@ impl JMAP {
changes.log_child_update(Collection::Thread, thread_id);
} else {
// Thread is empty, delete it
batch
.with_collection(Collection::Thread)
.delete_document(thread_id)
.with_collection(Collection::Email);
delete_thread_id = thread_id.into();
changes.log_delete(Collection::Thread, thread_id);
}
@ -954,11 +1023,29 @@ impl JMAP {
thread_id,
F_VALUE | F_BITMAP | F_CLEAR,
);
// Log message deletion
changes.log_delete(Collection::Email, Id::from_parts(thread_id, document_id));
} else {
return Ok(false);
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
thread_id = thread_id,
document_id = document_id,
"Failed to fetch thread tags.",
);
return Ok(Err(SetError::not_found()));
}
} else {
return Ok(false);
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch threadId.",
);
return Ok(Err(SetError::not_found()));
}
// Obtain message metadata
@ -967,15 +1054,25 @@ impl JMAP {
account_id,
Collection::Email,
document_id,
Property::ThreadId,
Property::BodyStructure,
)
.await?
{
metadata
} else {
return Ok(false);
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch message metadata.",
);
return Ok(Err(SetError::not_found()));
};
// Delete metadata
batch.value(Property::BodyStructure, (), F_VALUE | F_CLEAR);
// Remove properties from index
for (property, value) in metadata.properties {
match (&property, value) {
@ -1036,8 +1133,60 @@ impl JMAP {
}
}
// Delete metadata
batch.value(Property::BodyStructure, (), F_VALUE | F_CLEAR);
// Delete term index
if let Some(token_index) = self
.store
.get_value::<TokenIndex>(ValueKey::term_index(
account_id,
Collection::Email,
document_id,
))
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "email_delete",
error = ?err,
"Failed to deserialize term index.");
MethodError::ServerPartialFail
})?
{
batch.custom(token_index);
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch term index.",
);
return Ok(Err(SetError::not_found()));
}
// Delete threadId
if let Some(thread_id) = delete_thread_id {
batch
.with_collection(Collection::Thread)
.delete_document(thread_id);
}
// Commit batch
match self.store.write(batch.build()).await {
Ok(_) => (),
Err(store::Error::AssertValueFailed) => {
return Ok(Err(SetError::forbidden().with_description(
"Another process modified this message, please try again.",
)));
}
Err(err) => {
tracing::error!(
event = "error",
context = "email_delete",
error = ?err,
"Failed to commit batch.");
return Err(MethodError::ServerPartialFail);
}
}
// Delete blob
self.store
@ -1055,7 +1204,7 @@ impl JMAP {
MethodError::ServerPartialFail
})?;
Ok(true)
Ok(Ok(changes))
}
}

View file

@ -1,13 +1,20 @@
use api::session::BaseCapabilities;
use jmap_proto::{
error::method::MethodError,
method::set::{SetRequest, SetResponse},
method::{
query::{QueryRequest, QueryResponse},
set::{SetRequest, SetResponse},
},
request::reference::MaybeReference,
types::{collection::Collection, property::Property},
};
use store::{
ahash::AHashMap, fts::Language, roaring::RoaringBitmap, write::BitmapFamily, BitmapKey,
Deserialize, Serialize, Store, ValueKey,
ahash::AHashMap,
fts::Language,
query::{sort::Pagination, Comparator, Filter, ResultSet, SortedResultSet},
roaring::RoaringBitmap,
write::BitmapFamily,
BitmapKey, Deserialize, Serialize, Store, ValueKey,
};
use utils::{map::vec_map::VecMap, UnwrapFailure};
@ -15,6 +22,7 @@ pub mod api;
pub mod blob;
pub mod changes;
pub mod email;
pub mod mailbox;
pub mod thread;
pub struct JMAP {
@ -25,6 +33,7 @@ pub struct JMAP {
pub struct Config {
pub default_language: Language,
pub query_max_results: usize,
pub changes_max_results: usize,
pub request_max_size: usize,
pub request_max_calls: usize,
@ -60,6 +69,24 @@ impl JMAP {
}
}
pub async fn assign_document_id(
&self,
account_id: u32,
collection: Collection,
) -> Result<u32, MethodError> {
self.store
.assign_document_id(account_id, collection)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "assign_document_id",
error = ?err,
"Failed to assign documentId.");
MethodError::ServerPartialFail
})
}
pub async fn get_property<U>(
&self,
account_id: u32,
@ -95,7 +122,7 @@ impl JMAP {
&self,
account_id: u32,
collection: Collection,
document_ids: &[u32],
document_ids: impl Iterator<Item = u32>,
property: impl AsRef<Property>,
) -> Result<Vec<Option<U>>, MethodError>
where
@ -106,10 +133,7 @@ impl JMAP {
.store
.get_values::<U>(
document_ids
.iter()
.map(|document_id| {
ValueKey::new(account_id, collection, *document_id, property)
})
.map(|document_id| ValueKey::new(account_id, collection, document_id, property))
.collect(),
)
.await
@ -120,7 +144,6 @@ impl JMAP {
context = "store",
account_id = account_id,
collection = ?collection,
document_ids = ?document_ids,
property = ?property,
error = ?err,
"Failed to retrieve properties");
@ -179,9 +202,9 @@ impl JMAP {
}
}
pub async fn prepare_set_response(
pub async fn prepare_set_response<T>(
&self,
request: &SetRequest,
request: &SetRequest<T>,
collection: Collection,
) -> Result<SetResponse, MethodError> {
let n_create = request.create.as_ref().map_or(0, |objs| objs.len());
@ -216,4 +239,125 @@ impl JMAP {
not_destroyed: VecMap::new(),
})
}
pub async fn filter(
&self,
account_id: u32,
collection: Collection,
filters: Vec<Filter>,
) -> Result<ResultSet, MethodError> {
self.store
.filter(account_id, collection, filters)
.await
.map_err(|err| {
tracing::error!(event = "error",
context = "mailbox_set",
account_id = account_id,
collection = ?collection,
error = ?err,
"Failed to execute filter.");
MethodError::ServerPartialFail
})
}
pub async fn query<T>(
&self,
account_id: u32,
collection: Collection,
filters: Vec<Filter>,
request: &QueryRequest<T>,
) -> Result<(QueryResponse, ResultSet, Option<Pagination>), MethodError> {
let result_set = self.filter(account_id, collection, filters).await?;
let total = result_set.results.len() as usize;
let (limit_total, limit) = if let Some(limit) = request.limit {
if limit > 0 {
let limit = std::cmp::min(limit, self.config.query_max_results);
(std::cmp::min(limit, total), limit)
} else {
(0, 0)
}
} else {
(
std::cmp::min(self.config.query_max_results, total),
self.config.query_max_results,
)
};
Ok((
QueryResponse {
account_id: request.account_id,
query_state: self.get_state(account_id, collection).await?,
can_calculate_changes: true,
position: 0,
ids: vec![],
total: if request.calculate_total.unwrap_or(false) {
Some(total)
} else {
None
},
limit: if total > limit { Some(limit) } else { None },
},
result_set,
if limit_total > 0 {
Pagination::new(
limit_total,
request.position.unwrap_or(0),
request.anchor.map(|a| a.document_id()),
request.anchor_offset.unwrap_or(0),
)
.into()
} else {
None
},
))
}
pub async fn sort(
&self,
result_set: ResultSet,
comparators: Vec<Comparator>,
paginate: Pagination,
mut response: QueryResponse,
) -> Result<QueryResponse, MethodError> {
// Sort results
let collection = result_set.collection;
let account_id = result_set.account_id;
response.update_results(
match self.store.sort(result_set, comparators, paginate).await {
Ok(result) => result,
Err(err) => {
tracing::error!(event = "error",
context = "store",
account_id = account_id,
collection = ?collection,
error = ?err,
"Sort failed");
return Err(MethodError::ServerPartialFail);
}
},
)?;
Ok(response)
}
}
trait UpdateResults: Sized {
fn update_results(&mut self, sorted_results: SortedResultSet) -> Result<(), MethodError>;
}
impl UpdateResults for QueryResponse {
fn update_results(&mut self, sorted_results: SortedResultSet) -> Result<(), MethodError> {
// Prepare response
if sorted_results.found_anchor {
self.position = sorted_results.position;
self.ids = sorted_results
.ids
.into_iter()
.map(|id| id.into())
.collect::<Vec<_>>();
Ok(())
} else {
Err(MethodError::AnchorNotFound)
}
}
}

View file

@ -0,0 +1,280 @@
use jmap_proto::{
error::method::MethodError,
method::get::{GetRequest, GetResponse, RequestArguments},
object::Object,
types::{collection::Collection, keyword::Keyword, property::Property, value::Value},
};
use store::{ahash::AHashSet, roaring::RoaringBitmap};
use crate::JMAP;
impl JMAP {
pub async fn mailbox_get(
&self,
mut request: GetRequest<RequestArguments>,
) -> Result<GetResponse, MethodError> {
let ids = request.unwrap_ids(self.config.get_max_objects)?;
let properties = request.unwrap_properties(&[
Property::Id,
Property::Name,
Property::ParentId,
Property::Role,
Property::SortOrder,
Property::IsSubscribed,
Property::TotalEmails,
Property::UnreadEmails,
Property::TotalThreads,
Property::UnreadThreads,
Property::MyRights,
]);
let account_id = request.account_id.document_id();
let mailbox_ids = self
.get_document_ids(account_id, Collection::Mailbox)
.await?
.unwrap_or_default();
let message_ids = self.get_document_ids(account_id, Collection::Email).await?;
let ids = if let Some(ids) = ids {
ids
} else {
mailbox_ids
.iter()
.take(self.config.get_max_objects)
.map(Into::into)
.collect::<Vec<_>>()
};
let fetch_properties = properties.iter().any(|p| {
matches!(
p,
Property::Name
| Property::ParentId
| Property::Role
| Property::SortOrder
| Property::Acl
)
});
let mut response = GetResponse {
account_id: Some(request.account_id),
state: self.get_state(account_id, Collection::Mailbox).await?,
list: Vec::with_capacity(ids.len()),
not_found: vec![],
};
for id in ids {
// Obtain the mailbox object
let document_id = id.document_id();
if !mailbox_ids.contains(document_id) {
response.not_found.push(id);
continue;
}
let mut values = if fetch_properties {
match self
.get_property::<Object<Value>>(
account_id,
Collection::Mailbox,
document_id,
&Property::Value,
)
.await?
{
Some(values) => values,
None => {
response.not_found.push(id);
continue;
}
}
} else {
Object::with_capacity(0)
};
let mut mailbox = Object::with_capacity(properties.len());
for property in &properties {
let value = match property {
Property::Id => Value::Id(id),
Property::Name | Property::Role => values.remove(property),
Property::SortOrder => values
.properties
.remove(property)
.unwrap_or(Value::UnsignedInt(0)),
Property::ParentId => values
.properties
.remove(property)
.map(|parent_id| match parent_id {
Value::Id(value) if value.document_id() > 0 => {
Value::Id((value.document_id() - 1).into())
}
_ => Value::Null,
})
.unwrap_or_default(),
Property::TotalEmails => Value::UnsignedInt(
self.get_tag(
account_id,
Collection::Email,
Property::MailboxIds,
document_id,
)
.await?
.map(|v| v.len())
.unwrap_or(0),
),
Property::UnreadEmails => Value::UnsignedInt(
self.mailbox_unread_tags(account_id, document_id, &message_ids)
.await?
.map(|v| v.len())
.unwrap_or(0),
),
Property::TotalThreads => Value::UnsignedInt(
self.mailbox_count_threads(
account_id,
self.get_tag(
account_id,
Collection::Email,
Property::MailboxIds,
document_id,
)
.await?,
)
.await? as u64,
),
Property::UnreadThreads => Value::UnsignedInt(
self.mailbox_count_threads(
account_id,
self.mailbox_unread_tags(account_id, document_id, &message_ids)
.await?,
)
.await? as u64,
),
Property::MyRights => {
let todo = "add shared";
mailbox_rights_owner()
}
Property::IsSubscribed => values
.properties
.remove(property)
.map(|parent_id| match parent_id {
Value::List(values)
if values.contains(&Value::Id(account_id.into())) =>
{
let todo = "use acl id";
Value::Bool(true)
}
_ => Value::Bool(false),
})
.unwrap_or(Value::Bool(false)),
/*Property::ACL
if acl.is_member(account_id)
|| self
.mail_shared_folders(account_id, &acl.member_of, Acl::Administer)?
.has_access(document_id) =>
{
let mut acl_get = VecMap::new();
for (account_id, acls) in fields.as_ref().unwrap().get_acls() {
if let Some(email) = self.principal_to_email(account_id)? {
acl_get.append(email, acls);
}
}
Value::ACLGet(acl_get)
}*/
_ => Value::Null,
};
mailbox.append(property.clone(), value);
}
}
Ok(response)
}
async fn mailbox_count_threads(
&self,
account_id: u32,
document_ids: Option<RoaringBitmap>,
) -> Result<usize, MethodError> {
if let Some(document_ids) = document_ids {
let mut thread_ids = AHashSet::default();
self.get_properties::<u32>(
account_id,
Collection::Email,
document_ids.into_iter(),
Property::ThreadId,
)
.await?
.into_iter()
.flatten()
.for_each(|thread_id| {
thread_ids.insert(thread_id);
});
Ok(thread_ids.len())
} else {
Ok(0)
}
}
async fn mailbox_unread_tags(
&self,
account_id: u32,
document_id: u32,
message_ids: &Option<RoaringBitmap>,
) -> Result<Option<RoaringBitmap>, MethodError> {
if let (Some(message_ids), Some(mailbox_message_ids)) = (
message_ids,
self.get_tag(
account_id,
Collection::Email,
Property::MailboxIds,
document_id,
)
.await?,
) {
if let Some(mut seen) = self
.get_tag(
account_id,
Collection::Email,
Property::Keywords,
Keyword::Seen,
)
.await?
{
seen ^= message_ids;
seen &= &mailbox_message_ids;
if !seen.is_empty() {
Ok(Some(seen))
} else {
Ok(None)
}
} else {
Ok(mailbox_message_ids.into())
}
} else {
Ok(None)
}
}
}
fn mailbox_rights_owner() -> Value {
Object::with_capacity(9)
.with_property(Property::MayReadItems, true)
.with_property(Property::MayAddItems, true)
.with_property(Property::MayRemoveItems, true)
.with_property(Property::MaySetSeen, true)
.with_property(Property::MaySetKeywords, true)
.with_property(Property::MayCreateChild, true)
.with_property(Property::MayRename, true)
.with_property(Property::MayDelete, true)
.with_property(Property::MaySubmit, true)
.into()
}
/*fn mailbox_rights_shared(acl: Bitmap<Acl>) -> Value {
Object::with_capacity(9)
.with_property(Property::MayReadItems, acl.contains(Acl::ReadItems))
.with_property(Property::MayAddItems, acl.contains(Acl::AddItems))
.with_property(Property::MayRemoveItems, acl.contains(Acl::RemoveItems))
.with_property(Property::MaySetSeen, acl.contains(Acl::ModifyItems))
.with_property(Property::MaySetKeywords, acl.contains(Acl::ModifyItems))
.with_property(Property::MayCreateChild, acl.contains(Acl::CreateChild))
.with_property(Property::MayRename, acl.contains(Acl::Modify))
.with_property(Property::MayDelete, acl.contains(Acl::Delete))
.with_property(Property::MaySubmit, acl.contains(Acl::Submit))
.into()
}*/

View file

@ -0,0 +1,6 @@
pub mod get;
pub mod query;
pub mod set;
pub const INBOX_ID: u32 = 0;
pub const TRASH_ID: u32 = 1;

View file

@ -0,0 +1,237 @@
use jmap_proto::{
error::method::MethodError,
method::query::{Comparator, Filter, QueryRequest, QueryResponse, SortProperty},
object::{mailbox::QueryArguments, Object},
types::{collection::Collection, property::Property, value::Value},
};
use store::{
ahash::{AHashMap, AHashSet},
fts::Language,
query::{self, sort::Pagination},
roaring::RoaringBitmap,
};
use crate::{UpdateResults, JMAP};
impl JMAP {
pub async fn mailbox_query(
&self,
mut request: QueryRequest<QueryArguments>,
) -> Result<QueryResponse, MethodError> {
let todo = "fix primary";
let primary_account_id = request.account_id.document_id();
let account_id = request.account_id.document_id();
let sort_as_tree = request.arguments.sort_as_tree.unwrap_or(false);
let filter_as_tree = request.arguments.filter_as_tree.unwrap_or(false);
let mut filters = Vec::with_capacity(request.filter.len());
for cond in std::mem::take(&mut request.filter) {
match cond {
Filter::ParentId(parent_id) => filters.push(query::Filter::eq(
Property::ParentId,
parent_id.map(|id| id.document_id() + 1).unwrap_or(0),
)),
Filter::Name(name) => {
#[cfg(feature = "test_mode")]
{
// Used for concurrent requests tests
if name == "__sleep" {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
filters.push(query::Filter::has_text(
Property::Name,
&name,
Language::None,
));
}
Filter::Role(role) => {
if let Some(role) = role {
filters.push(query::Filter::has_text(
Property::Role,
&role,
Language::None,
));
} else {
filters.push(query::Filter::Not);
filters.push(query::Filter::is_in_bitmap(Property::Role, ()));
filters.push(query::Filter::End);
}
}
Filter::HasAnyRole(has_role) => {
if !has_role {
filters.push(query::Filter::Not);
}
filters.push(query::Filter::is_in_bitmap(Property::Role, ()));
if !has_role {
filters.push(query::Filter::End);
}
}
Filter::IsSubscribed(is_subscribed) => {
if !is_subscribed {
filters.push(query::Filter::Not);
}
filters.push(query::Filter::eq(
Property::IsSubscribed,
primary_account_id,
));
if !is_subscribed {
filters.push(query::Filter::End);
}
}
Filter::And | Filter::Or | Filter::Not | Filter::Close => {
filters.push(cond.into());
}
other => return Err(MethodError::UnsupportedFilter(other.to_string())),
}
}
let (mut response, mut result_set, mut paginate) = self
.query(account_id, Collection::Mailbox, filters, &request)
.await?;
// Build mailbox tree
let mut hierarchy = AHashMap::default();
let mut tree = AHashMap::default();
if (filter_as_tree || sort_as_tree)
&& (paginate.is_some()
|| (response.total.map_or(false, |total| total > 0) && filter_as_tree))
{
for document_id in self
.get_document_ids(account_id, Collection::Mailbox)
.await?
.unwrap_or_default()
{
let parent_id = self
.get_property::<Object<Value>>(
account_id,
Collection::Mailbox,
document_id,
Property::Value,
)
.await?
.and_then(|o| {
o.properties
.get(&Property::ParentId)
.and_then(|id| id.as_id().map(|id| id.document_id()))
})
.unwrap_or(0);
hierarchy.insert(document_id + 1, parent_id);
tree.entry(parent_id)
.or_insert_with(AHashSet::default)
.insert(document_id + 1);
}
if filter_as_tree {
let mut filtered_ids = RoaringBitmap::new();
for document_id in &result_set.results {
let mut keep = false;
let mut jmap_id = document_id + 1;
for _ in 0..self.config.mailbox_max_depth {
if let Some(&parent_id) = hierarchy.get(&jmap_id) {
if parent_id == 0 {
keep = true;
break;
} else if !result_set.results.contains(parent_id - 1) {
break;
} else {
jmap_id = parent_id;
}
} else {
break;
}
}
if keep {
filtered_ids.push(document_id);
}
}
if filtered_ids.len() != result_set.results.len() {
let total = filtered_ids.len() as usize;
if response.total.is_some() {
response.total = Some(total);
}
if let Some(paginate) = &mut paginate {
if paginate.limit > total {
paginate.limit = total;
}
}
result_set.results = filtered_ids;
}
}
}
if let Some(mut paginate) = paginate {
// Parse sort criteria
let mut comparators = Vec::with_capacity(request.sort.as_ref().map_or(1, |s| s.len()));
for comparator in request
.sort
.and_then(|s| if !s.is_empty() { s.into() } else { None })
.unwrap_or_else(|| vec![Comparator::descending(SortProperty::ParentId)])
{
comparators.push(match comparator.property {
SortProperty::Name => {
query::Comparator::field(Property::Name, comparator.is_ascending)
}
SortProperty::SortOrder => {
query::Comparator::field(Property::SortOrder, comparator.is_ascending)
}
SortProperty::ParentId => {
query::Comparator::field(Property::ParentId, comparator.is_ascending)
}
other => return Err(MethodError::UnsupportedSort(other.to_string())),
});
}
// Sort as tree
if sort_as_tree {
let dummy_paginate = Pagination::new(result_set.results.len() as usize, 0, None, 0);
response = self
.sort(result_set, comparators, dummy_paginate, response)
.await?;
let mut stack = Vec::new();
let mut jmap_id = 0;
'outer: for _ in 0..(response.ids.len() * 10 * self.config.mailbox_max_depth) {
let (mut children, mut it) = if let Some(children) = tree.remove(&jmap_id) {
(children, response.ids.iter())
} else if let Some(prev) = stack.pop() {
prev
} else {
break;
};
while let Some(&id) = it.next() {
jmap_id = id.document_id() + 1;
if children.remove(&jmap_id) {
if !paginate.add(0, id.document_id()) {
break 'outer;
} else {
stack.push((children, it));
continue 'outer;
}
}
}
if !children.is_empty() {
jmap_id = *children.iter().next().unwrap();
children.remove(&jmap_id);
stack.push((children, it));
}
}
response.update_results(paginate.build())?;
} else {
response = self
.sort(result_set, comparators, paginate, response)
.await?;
}
}
Ok(response)
}
}

View file

@ -0,0 +1,656 @@
use jmap_proto::{
error::{
method::MethodError,
set::{SetError, SetErrorType},
},
method::set::{SetRequest, SetResponse},
object::{
index::{IndexAs, IndexProperty, ObjectIndexBuilder},
mailbox::SetArguments,
Object,
},
response::Response,
types::{
collection::Collection,
id::Id,
property::Property,
value::{MaybePatchValue, SetValue, Value},
},
};
use store::{
query::Filter,
roaring::RoaringBitmap,
write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, F_BITMAP, F_CLEAR, F_VALUE},
};
use crate::JMAP;
use super::{INBOX_ID, TRASH_ID};
struct SetContext<'x> {
account_id: u32,
primary_id: u32,
response: &'x Response,
mailbox_ids: RoaringBitmap,
will_destroy: Vec<Id>,
}
static SCHEMA: &[IndexProperty] = &[
IndexProperty::new(Property::Name)
.index_as(IndexAs::Text {
tokenize: true,
index: true,
})
.required(),
IndexProperty::new(Property::Role)
.index_as(IndexAs::Text {
tokenize: false,
index: true,
})
.required(),
IndexProperty::new(Property::Role).index_as(IndexAs::HasProperty),
IndexProperty::new(Property::ParentId).index_as(IndexAs::Integer),
IndexProperty::new(Property::SortOrder).index_as(IndexAs::Integer),
IndexProperty::new(Property::IsSubscribed).index_as(IndexAs::IntegerList),
];
impl JMAP {
#[allow(clippy::blocks_in_if_conditions)]
pub async fn mailbox_set(
&self,
mut request: SetRequest<SetArguments>,
response: &Response,
) -> Result<SetResponse, MethodError> {
// Prepare response
let account_id = request.account_id.document_id();
let mut set_response = self
.prepare_set_response(&request, Collection::Mailbox)
.await?;
let on_destroy_remove_emails = request.arguments.on_destroy_remove_emails.unwrap_or(false);
let mut ctx = SetContext {
account_id,
primary_id: account_id,
response,
mailbox_ids: self
.get_document_ids(account_id, Collection::Mailbox)
.await?
.unwrap_or_default(),
will_destroy: request.unwrap_destroy(),
};
// Process creates
let mut changes = ChangeLogBuilder::new();
'create: for (id, object) in request.unwrap_create() {
match self.mailbox_set_item(object, None, &ctx).await? {
Ok(builder) => {
let mut batch = BatchBuilder::new();
let document_id = self
.assign_document_id(account_id, Collection::Mailbox)
.await?;
batch
.with_account_id(account_id)
.with_collection(Collection::Mailbox)
.create_document(document_id)
.custom(builder);
changes.log_insert(Collection::Mailbox, document_id);
ctx.mailbox_ids.insert(document_id);
self.store.write(batch.build()).await.map_err(|err| {
tracing::error!(
event = "error",
context = "mailbox_set",
account_id = account_id,
error = ?err,
"Failed to create mailbox(es).");
MethodError::ServerPartialFail
})?;
set_response.created(id, document_id);
}
Err(err) => {
set_response.not_created.append(id, err);
continue 'create;
}
}
}
// Process updates
'update: for (id, object) in request.unwrap_update() {
// Obtain mailbox
let document_id = id.document_id();
if let Some(mut mailbox) = self
.get_property::<HashedValue<Object<Value>>>(
account_id,
Collection::Mailbox,
document_id,
Property::Value,
)
.await?
{
match self
.mailbox_set_item(object, (document_id, mailbox.take()).into(), &ctx)
.await?
{
Ok(builder) => {
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Mailbox)
.create_document(document_id)
.assert_value(Property::Value, &mailbox)
.custom(builder);
if !batch.is_empty() {
changes.log_update(Collection::Mailbox, document_id);
match self.store.write(batch.build()).await {
Ok(_) => (),
Err(store::Error::AssertValueFailed) => {
set_response.not_updated.append(id, SetError::forbidden().with_description(
"Another process modified this mailbox, please try again.",
));
continue 'update;
}
Err(err) => {
tracing::error!(
event = "error",
context = "mailbox_set",
account_id = account_id,
error = ?err,
"Failed to update mailbox(es).");
return Err(MethodError::ServerPartialFail);
}
}
}
set_response.updated.append(id, None);
}
Err(err) => {
set_response.not_updated.append(id, err);
continue 'update;
}
}
} else {
set_response.not_updated.append(id, SetError::not_found());
}
}
// Process deletions
'destroy: for id in ctx.will_destroy {
let document_id = id.document_id();
// Internal folders cannot be deleted
if document_id == INBOX_ID || document_id == TRASH_ID {
set_response.not_destroyed.append(
id,
SetError::forbidden()
.with_description("You are not allowed to delete Inbox or Trash folders."),
);
continue;
}
// Verify that this mailbox does not have sub-mailboxes
if !self
.filter(
account_id,
Collection::Mailbox,
vec![Filter::eq(Property::ParentId, document_id + 1)],
)
.await?
.results
.is_empty()
{
set_response.not_destroyed.append(
id,
SetError::new(SetErrorType::MailboxHasChild)
.with_description("Mailbox has at least one children."),
);
continue;
}
// Verify that the mailbox is empty
if let Some(message_ids) = self
.get_tag(
account_id,
Collection::Email,
Property::MailboxIds,
document_id,
)
.await?
{
if on_destroy_remove_emails {
// If the message is in multiple mailboxes, untag it from the current mailbox,
// otherwise delete it.
for message_id in message_ids {
// Obtain mailboxIds
if let Some(mailbox_ids) = self
.get_property::<HashedValue<Vec<u32>>>(
account_id,
Collection::Email,
message_id,
Property::MailboxIds,
)
.await?
.and_then(|mut ids| {
let idx = ids.inner.iter().position(|&id| id == document_id)?;
ids.inner.swap_remove(idx);
Some(ids)
})
{
if !mailbox_ids.inner.is_empty() {
// Obtain threadId
if let Some(thread_id) = self
.get_property::<u32>(
account_id,
Collection::Email,
message_id,
Property::ThreadId,
)
.await?
{
// Untag message from mailbox
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.update_document(message_id)
.assert_value(Property::MailboxIds, &mailbox_ids)
.value(Property::MailboxIds, mailbox_ids.inner, F_VALUE)
.value(
Property::MailboxIds,
document_id,
F_BITMAP | F_CLEAR,
);
match self.store.write(batch.build()).await {
Ok(_) => changes.log_update(
Collection::Email,
Id::from_parts(thread_id, message_id),
),
Err(store::Error::AssertValueFailed) => {
set_response.not_destroyed.append(
id,
SetError::forbidden().with_description(
concat!("Another process modified a message in this mailbox ",
"while deleting it, please try again.")
),
);
continue 'destroy;
}
Err(err) => {
tracing::error!(
event = "error",
context = "mailbox_set",
account_id = account_id,
mailbox_id = document_id,
message_id = message_id,
error = ?err,
"Failed to update message while deleting mailbox.");
return Err(MethodError::ServerPartialFail);
}
}
} else {
tracing::debug!(
event = "error",
context = "mailbox_set",
account_id = account_id,
mailbox_id = document_id,
message_id = message_id,
"Message does not have a threadId, skipping."
);
}
} else {
// Delete message
if let Ok(mut change) =
self.email_delete(account_id, document_id).await?
{
change.changes.remove(&(Collection::Mailbox as u8));
changes.merge(change);
}
}
} else {
tracing::debug!(
event = "error",
context = "mailbox_set",
account_id = account_id,
mailbox_id = document_id,
message_id = message_id,
"Message is not in the mailbox, skipping."
);
}
}
} else {
set_response.not_destroyed.append(
id,
SetError::new(SetErrorType::MailboxHasEmail)
.with_description("Mailbox is not empty."),
);
continue;
}
}
// Obtain mailbox
if let Some(mailbox) = self
.get_property::<HashedValue<Object<Value>>>(
account_id,
Collection::Mailbox,
document_id,
Property::Value,
)
.await?
{
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Mailbox)
.delete_document(document_id)
.assert_value(Property::Value, &mailbox)
.custom(ObjectIndexBuilder::new(SCHEMA).with_current(mailbox.inner));
match self.store.write(batch.build()).await {
Ok(_) => changes.log_delete(Collection::Mailbox, document_id),
Err(store::Error::AssertValueFailed) => {
set_response.not_destroyed.append(
id,
SetError::forbidden().with_description(concat!(
"Another process modified this mailbox ",
"while deleting it, please try again."
)),
);
}
Err(err) => {
tracing::error!(
event = "error",
context = "mailbox_set",
account_id = account_id,
document_id = document_id,
error = ?err,
"Failed to delete mailbox.");
return Err(MethodError::ServerPartialFail);
}
}
} else {
set_response.not_destroyed.append(id, SetError::not_found());
}
}
// Write changes
if !changes.is_empty() {
set_response.new_state = self.commit_changes(account_id, changes).await?.into();
}
Ok(set_response)
}
#[allow(clippy::blocks_in_if_conditions)]
async fn mailbox_set_item(
&self,
changes_: Object<SetValue>,
update: Option<(u32, Object<Value>)>,
ctx: &SetContext<'_>,
) -> Result<Result<ObjectIndexBuilder, SetError>, MethodError> {
// Parse properties
let mut changes = Object::with_capacity(changes_.properties.len());
for item in changes_.iterate_and_eval_references(ctx.response) {
let item = match item {
Ok(item) => item,
Err(err) => {
return Ok(Err(err));
}
};
match item {
(Property::Name, MaybePatchValue::Value(Value::Text(value))) => {
let value = value.trim();
if !value.is_empty() && value.len() < self.config.mailbox_name_max_len {
changes.append(Property::Name, Value::Text(value.to_string()));
} else {
return Ok(Err(SetError::invalid_properties()
.with_property(Property::Name)
.with_description(
if !value.is_empty() {
"Mailbox name is too long."
} else {
"Mailbox name cannot be empty."
}
.to_string(),
)));
}
}
(Property::ParentId, MaybePatchValue::Value(Value::Id(value))) => {
let parent_id = value.document_id();
if ctx.will_destroy.contains(&value) {
return Ok(Err(SetError::will_destroy()
.with_description("Parent ID will be destroyed.")));
} else if !ctx.mailbox_ids.contains(parent_id) {
return Ok(Err(SetError::invalid_properties()
.with_description("Parent ID does not exist.")));
}
changes.append(Property::ParentId, Value::Id((parent_id + 1).into()));
}
(Property::ParentId, MaybePatchValue::Value(Value::Null)) => {
changes.append(Property::ParentId, Value::Id(0u64.into()))
}
(Property::IsSubscribed, MaybePatchValue::Value(Value::Bool(subscribe))) => {
let fixme = "true";
let account_id = Value::Id(ctx.primary_id.into());
let mut new_value = None;
if let Some((_, current_fields)) = update.as_ref() {
if let Value::List(subscriptions) =
current_fields.get(&Property::IsSubscribed)
{
if subscribe {
if !subscriptions.contains(&account_id) {
let mut current_subscriptions = subscriptions.clone();
current_subscriptions.push(account_id.clone());
new_value = Value::List(current_subscriptions).into();
} else {
continue;
}
} else if subscriptions.contains(&account_id) {
if subscriptions.len() > 1 {
new_value = Value::List(
subscriptions
.iter()
.filter(|id| *id != &account_id)
.cloned()
.collect(),
)
.into();
} else {
new_value = Value::Null.into();
}
} else {
continue;
}
}
}
changes.append(
Property::IsSubscribed,
if let Some(new_value) = new_value {
new_value
} else if subscribe {
Value::List(vec![account_id])
} else {
continue;
},
);
}
(Property::Role, MaybePatchValue::Value(Value::Text(value))) => {
let role = value.trim().to_lowercase();
if [
"inbox", "trash", "spam", "junk", "drafts", "archive", "sent",
]
.contains(&role.as_str())
{
changes.append(Property::Role, Value::Text(role));
} else {
return Ok(Err(SetError::invalid_properties()
.with_property(Property::Role)
.with_description(format!("Invalid role {role:?}."))));
}
}
(Property::Role, MaybePatchValue::Value(Value::Null)) => {
changes.append(Property::Role, Value::Null)
}
(Property::SortOrder, MaybePatchValue::Value(Value::UnsignedInt(value))) => {
changes.append(Property::SortOrder, Value::UnsignedInt(value));
}
(Property::Acl, _) => {
todo!()
}
(property, _) => {
return Ok(Err(SetError::invalid_properties()
.with_property(property)
.with_description("Invalid property or value.".to_string())))
}
};
}
// Validate depth and circular parent-child relationship
if let Value::Id(mailbox_parent_id) = changes.get(&Property::ParentId) {
let current_mailbox_id = update
.as_ref()
.map_or(u32::MAX, |(mailbox_id, _)| *mailbox_id + 1);
let mut mailbox_parent_id = mailbox_parent_id.document_id();
let mut success = false;
for _ in 0..self.config.mailbox_max_depth {
if mailbox_parent_id == current_mailbox_id {
return Ok(Err(SetError::invalid_properties()
.with_property(Property::ParentId)
.with_description("Mailbox cannot be a parent of itself.")));
} else if mailbox_parent_id == 0 {
success = true;
break;
}
let parent_document_id = mailbox_parent_id - 1;
if let Some(mut fields) = self
.get_property::<Object<Value>>(
ctx.account_id,
Collection::Mailbox,
parent_document_id,
Property::Value,
)
.await?
{
mailbox_parent_id = fields
.properties
.remove(&Property::ParentId)
.and_then(|v| v.try_unwrap_id().map(|id| id.document_id()))
.unwrap_or(0);
} else if ctx.mailbox_ids.contains(parent_document_id) {
// Parent mailbox is probably created within the same request
success = true;
break;
} else {
return Ok(Err(SetError::invalid_properties()
.with_property(Property::ParentId)
.with_description("Mailbox parent does not exist.")));
}
}
if !success {
return Ok(Err(SetError::invalid_properties()
.with_property(Property::ParentId)
.with_description(
"Mailbox parent-child relationship is too deep.",
)));
}
} else if update.is_none() {
// Set parentId if the field is missing
changes.append(Property::ParentId, Value::Id(0u64.into()));
}
// Verify that the mailbox role is unique.
if let Value::Text(mailbox_role) = changes.get(&Property::Role) {
if update
.as_ref()
.map(|(_, update)| update.get(&Property::Role))
.and_then(|v| v.as_string())
.unwrap_or_default()
!= mailbox_role
{
if !self
.filter(
ctx.account_id,
Collection::Mailbox,
vec![Filter::eq(Property::Role, mailbox_role.as_str())],
)
.await?
.results
.is_empty()
{
return Ok(Err(SetError::invalid_properties()
.with_property(Property::Role)
.with_description(format!(
"A mailbox with role '{}' already exists.",
mailbox_role
))));
}
// Role of internal folders cannot be modified
if update.as_ref().map_or(false, |(document_id, _)| {
*document_id == INBOX_ID || *document_id == TRASH_ID
}) {
return Ok(Err(SetError::invalid_properties()
.with_property(Property::Role)
.with_description(
"You are not allowed to change the role of Inbox or Trash folders.",
)));
}
}
}
// Verify that the mailbox name is unique.
if let Value::Text(mailbox_name) = changes.get(&Property::Name) {
// Obtain parent mailbox id
if let Some(parent_mailbox_id) = if let Some(mailbox_parent_id) = &changes
.properties
.get(&Property::ParentId)
.and_then(|id| id.as_id().map(|id| id.document_id()))
{
(*mailbox_parent_id).into()
} else if let Some((_, current_fields)) = &update {
if current_fields
.properties
.get(&Property::Name)
.and_then(|n| n.as_string())
!= Some(mailbox_name)
{
current_fields
.properties
.get(&Property::ParentId)
.and_then(|id| id.as_id().map(|id| id.document_id()))
.unwrap_or_default()
.into()
} else {
None
}
} else {
0.into()
} {
if !self
.filter(
ctx.account_id,
Collection::Mailbox,
vec![
Filter::eq(Property::Name, mailbox_name.as_str()),
Filter::eq(Property::ParentId, parent_mailbox_id),
],
)
.await?
.results
.is_empty()
{
return Ok(Err(SetError::invalid_properties()
.with_property(Property::Name)
.with_description(format!(
"A mailbox with name '{}' already exists.",
mailbox_name
))));
}
}
}
// Validate
Ok(ObjectIndexBuilder::new(SCHEMA)
.with_changes(changes)
.with_current_opt(update.map(|(_, current)| current))
.validate())
}
}

View file

@ -26,8 +26,8 @@ impl JMAP {
.collect()
};
let add_email_ids = request
.unwrap_properties()
.map_or(true, |properties| properties.contains(&Property::EmailIds));
.properties
.map_or(true, |p| p.unwrap().contains(&Property::EmailIds));
let mut response = GetResponse {
account_id: Some(request.account_id),
state: self.get_state(account_id, Collection::Thread).await?,
@ -49,14 +49,7 @@ impl JMAP {
.sort(
ResultSet::new(account_id, Collection::Email, document_ids.clone()),
vec![Comparator::ascending(Property::ReceivedAt)],
Pagination::new(
document_ids.len() as usize,
0,
None,
0,
None,
false,
),
Pagination::new(document_ids.len() as usize, 0, None, 0),
)
.await
.map_err(|err| {

View file

@ -30,6 +30,7 @@ siphasher = "0.3"
parking_lot = { version = "0.12.1", optional = true }
lru-cache = { version = "0.1.2", optional = true }
blake3 = "1.3.3"
tracing = "0.1"
[features]
default = ["sqlite"]

View file

@ -265,6 +265,7 @@ impl ReadTransaction<'_> {
let mut rows = query.query([&begin, &end])?;
while let Some(row) = rows.next()? {
//TODO remove subspace in Foundation
let key = row.get_ref(0)?.as_bytes()?;
let value = row.get_ref(1)?.as_bytes()?;

View file

@ -1,18 +1,18 @@
use std::borrow::Cow;
use std::{borrow::Cow, collections::HashSet};
use ahash::AHashSet;
use utils::map::vec_map::VecMap;
use crate::{
write::{BatchBuilder, IntoOperations, Operation},
Serialize, BLOOM_BIGRAM, BLOOM_TRIGRAM, HASH_EXACT, HASH_STEMMED,
Serialize, HASH_EXACT, HASH_STEMMED,
};
use super::{
bloom::BloomFilter,
lang::{LanguageDetector, MIN_LANGUAGE_SCORE},
ngram::ToNgrams,
stemmer::Stemmer,
tokenizers::space::SpaceTokenizer,
term_index::{TermIndexBuilder, TokenIndex},
tokenizers::{space::SpaceTokenizer, Token},
Language,
};
@ -27,7 +27,7 @@ struct Text<'x> {
pub struct FtsIndexBuilder<'x> {
parts: Vec<Text<'x>>,
tokens: AHashSet<(u8, String)>,
tokens: VecMap<u8, AHashSet<String>>,
detect: LanguageDetector,
default_language: Language,
}
@ -37,7 +37,7 @@ impl<'x> FtsIndexBuilder<'x> {
FtsIndexBuilder {
parts: vec![],
detect: LanguageDetector::new(),
tokens: AHashSet::new(),
tokens: VecMap::new(),
default_language,
}
}
@ -60,14 +60,16 @@ impl<'x> FtsIndexBuilder<'x> {
}
pub fn index_raw(&mut self, field: impl Into<u8>, text: &str) {
let field = field.into();
let tokens = self.tokens.get_mut_or_insert(field.into());
for token in SpaceTokenizer::new(text, MAX_TOKEN_LENGTH) {
self.tokens.insert((field, token));
tokens.insert(token);
}
}
pub fn index_raw_token(&mut self, field: impl Into<u8>, token: impl Into<String>) {
self.tokens.insert((field.into(), token.into()));
self.tokens
.get_mut_or_insert(field.into())
.insert(token.into());
}
}
@ -77,22 +79,27 @@ impl<'x> IntoOperations for FtsIndexBuilder<'x> {
.detect
.most_frequent_language()
.unwrap_or(self.default_language);
let mut term_index = TermIndexBuilder::new();
for part in &self.parts {
for (part_id, part) in self.parts.iter().enumerate() {
let language = if part.language != Language::Unknown {
part.language
} else {
default_language
};
let mut unique_words = AHashSet::new();
let mut phrase_words = Vec::new();
let mut terms = Vec::new();
for token in Stemmer::new(&part.text, language, MAX_TOKEN_LENGTH).collect::<Vec<_>>() {
unique_words.insert((token.word.to_string(), HASH_EXACT));
if let Some(stemmed_word) = token.stemmed_word {
unique_words.insert((stemmed_word.into_owned(), HASH_STEMMED));
if let Some(stemmed_word) = &token.stemmed_word {
unique_words.insert((stemmed_word.to_string(), HASH_STEMMED));
}
phrase_words.push(token.word);
terms.push(term_index.add_stemmed_token(token));
}
if !terms.is_empty() {
term_index.add_terms(part.field, part_id as u32, terms);
}
for (word, family) in unique_words {
@ -100,27 +107,72 @@ impl<'x> IntoOperations for FtsIndexBuilder<'x> {
.ops
.push(Operation::hash(&word, family, part.field, true));
}
}
if phrase_words.len() > 1 {
batch.ops.push(Operation::Value {
field: part.field,
family: BLOOM_BIGRAM,
set: BloomFilter::to_ngrams(&phrase_words, 2).serialize().into(),
});
if phrase_words.len() > 2 {
batch.ops.push(Operation::Value {
field: part.field,
family: BLOOM_TRIGRAM,
set: BloomFilter::to_ngrams(&phrase_words, 3).serialize().into(),
});
for (field, tokens) in self.tokens {
let mut terms = Vec::with_capacity(tokens.len());
for token in tokens {
batch
.ops
.push(Operation::hash(&token, HASH_EXACT, field, true));
terms.push(term_index.add_token(Token {
word: token.into(),
offset: 0,
len: 0,
}));
}
term_index.add_terms(field, 0, terms);
}
batch.ops.push(Operation::Value {
field: u8::MAX,
family: u8::MAX,
set: term_index.serialize().into(),
});
}
}
impl IntoOperations for TokenIndex {
fn build(self, batch: &mut BatchBuilder) {
for term in self.terms {
for (term_ids, is_exact) in [(term.exact_terms, true), (term.stemmed_terms, false)] {
for term_id in term_ids {
if let Some(word) = self.tokens.get(term_id as usize) {
batch.ops.push(Operation::hash(
word,
if is_exact { HASH_EXACT } else { HASH_STEMMED },
term.field_id,
false,
));
}
}
}
}
for (field, token) in self.tokens {
batch
.ops
.push(Operation::hash(&token, HASH_EXACT, field, true));
}
batch.ops.push(Operation::Value {
field: u8::MAX,
family: u8::MAX,
set: None,
});
}
}
pub trait ToTokens {
fn to_tokens(&self) -> HashSet<String>;
}
impl ToTokens for &str {
fn to_tokens(&self) -> HashSet<String> {
let mut tokens = HashSet::new();
for token in SpaceTokenizer::new(self, MAX_TOKEN_LENGTH) {
tokens.insert(token);
}
tokens
}
}
impl ToTokens for &String {
fn to_tokens(&self) -> HashSet<String> {
self.as_str().to_tokens()
}
}

View file

@ -36,7 +36,7 @@ pub mod ngram;
pub mod query;
//pub mod search_snippet;
pub mod stemmer;
//pub mod term_index;
pub mod term_index;
pub mod tokenizers;
#[derive(Debug, PartialEq, Clone, Copy, Hash, Eq, serde::Serialize, serde::Deserialize)]

View file

@ -1,19 +1,11 @@
use std::time::Instant;
use roaring::RoaringBitmap;
use crate::{
fts::{
bloom::{BloomFilter, BloomHashGroup},
builder::MAX_TOKEN_LENGTH,
ngram::ToNgrams,
stemmer::Stemmer,
tokenizers::Tokenizer,
},
BitmapKey, ReadTransaction, ValueKey, BLOOM_BIGRAM, BLOOM_TRIGRAM, HASH_EXACT, HASH_STEMMED,
fts::{builder::MAX_TOKEN_LENGTH, stemmer::Stemmer, tokenizers::Tokenizer},
BitmapKey, ReadTransaction, ValueKey, HASH_EXACT, HASH_STEMMED,
};
use super::Language;
use super::{term_index::TermIndex, Language};
impl ReadTransaction<'_> {
#[maybe_async::maybe_async]
@ -26,10 +18,8 @@ impl ReadTransaction<'_> {
language: Language,
match_phrase: bool,
) -> crate::Result<Option<RoaringBitmap>> {
let real_now = Instant::now();
let (bitmaps, hashes, family) = if match_phrase {
let mut tokens = Vec::new();
if match_phrase {
let mut phrase = Vec::new();
let mut bit_keys = Vec::new();
for token in Tokenizer::new(text, language, MAX_TOKEN_LENGTH) {
let key = BitmapKey::hash(
@ -43,26 +33,66 @@ impl ReadTransaction<'_> {
bit_keys.push(key);
}
tokens.push(token.word);
phrase.push(token.word);
}
let bitmaps = match self.get_bitmaps_intersection(bit_keys).await? {
Some(b) if !b.is_empty() => b,
_ => return Ok(None),
};
match tokens.len() {
match phrase.len() {
0 => return Ok(None),
1 => return Ok(Some(bitmaps)),
2 => (
bitmaps,
<Vec<BloomHashGroup>>::to_ngrams(&tokens, 2),
BLOOM_BIGRAM,
),
_ => (
bitmaps,
<Vec<BloomHashGroup>>::to_ngrams(&tokens, 3),
BLOOM_TRIGRAM,
),
_ => (),
}
let mut results = RoaringBitmap::new();
for document_id in bitmaps {
self.refresh_if_old().await?;
if let Some(term_index) = self
.get_value::<TermIndex>(ValueKey::term_index(
account_id,
collection,
document_id,
))
.await?
{
if term_index
.match_terms(
&phrase
.iter()
.map(|w| term_index.get_match_term(w, None))
.collect::<Vec<_>>(),
field.into(),
true,
false,
false,
)
.map_err(|e| {
crate::Error::InternalError(format!(
"TermIndex match_terms failed for {account_id}/{collection}/{document_id}: {e:?}"
))
})?
.is_some()
{
results.insert(document_id);
}
} else {
tracing::debug!(
event = "error",
context = "fts_query",
account_id = account_id,
collection = collection,
document_id = document_id,
"Document is missing a term index",
);
}
}
if !results.is_empty() {
Ok(Some(results))
} else {
Ok(None)
}
} else {
let mut bitmaps = RoaringBitmap::new();
@ -96,48 +126,7 @@ impl ReadTransaction<'_> {
};
}
return Ok(Some(bitmaps));
};
let b_count = bitmaps.len();
let mut bm = RoaringBitmap::new();
for document_id in bitmaps {
self.refresh_if_old().await?;
if let Some(bloom) = self
.get_value::<BloomFilter>(ValueKey {
account_id,
collection,
document_id,
family,
field,
})
.await?
{
if !bloom.is_empty() {
let mut matched = true;
for hash in &hashes {
if !(bloom.contains(&hash.h1)
|| hash.h2.as_ref().map_or(false, |h2| bloom.contains(h2)))
{
matched = false;
break;
}
}
if matched {
bm.insert(document_id);
}
}
}
Ok(Some(bitmaps))
}
println!(
"bloom_match {text:?} {b_count} items in {:?}ms",
real_now.elapsed().as_millis()
);
Ok(Some(bm))
}
}

View file

@ -379,48 +379,10 @@ impl Serialize for TermIndexBuilder {
}
impl Deserialize for TermIndex {
fn deserialize(bytes: &[u8]) -> Option<Self> {
let (num_tokens, mut pos) = bytes.read_leb128()?;
let mut token_map = AHashMap::with_capacity(num_tokens as usize);
for term_id in 0..num_tokens {
let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?;
token_map.insert(
String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?,
term_id,
);
pos += nil_pos + 1;
}
let mut term_index = TermIndex {
items: Vec::new(),
token_map,
};
while pos < bytes.len() {
let item_len =
u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?) as usize;
pos += LENGTH_SIZE;
let field = bytes.get(pos)?;
pos += 1;
let (part_id, bytes_read) = bytes.get(pos..)?.read_leb128()?;
pos += bytes_read;
let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128()?;
pos += bytes_read;
term_index.items.push(TermIndexItem {
field_id: *field,
part_id,
terms_len,
terms: bytes.get(pos..pos + item_len)?.to_vec(),
});
pos += item_len;
}
Some(term_index)
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
TermIndex::from_bytes(bytes).ok_or_else(|| {
crate::Error::InternalError("Failed to deserialize term index".to_string())
})
}
}
@ -518,7 +480,7 @@ impl TermIndex {
pub fn match_terms(
&self,
match_terms: &[MatchTerm],
match_in: Option<AHashSet<u8>>,
match_field: Option<u8>,
match_phrase: bool,
match_many: bool,
include_offsets: bool,
@ -537,10 +499,8 @@ impl TermIndex {
let mut matched_mask = words_mask;
for item in &self.items {
if let Some(ref match_in) = match_in {
if !match_in.contains(&item.field_id) {
continue;
}
if match_field.map_or(false, |match_field| match_field != item.field_id) {
continue;
}
let mut terms = Vec::new();
@ -675,6 +635,50 @@ impl TermIndex {
None
})
}
fn from_bytes(bytes: &[u8]) -> Option<Self> {
let (num_tokens, mut pos) = bytes.read_leb128()?;
let mut token_map = AHashMap::with_capacity(num_tokens as usize);
for term_id in 0..num_tokens {
let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?;
token_map.insert(
String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?,
term_id,
);
pos += nil_pos + 1;
}
let mut term_index = TermIndex {
items: Vec::new(),
token_map,
};
while pos < bytes.len() {
let item_len =
u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?) as usize;
pos += LENGTH_SIZE;
let field = bytes.get(pos)?;
pos += 1;
let (part_id, bytes_read) = bytes.get(pos..)?.read_leb128()?;
pos += bytes_read;
let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128()?;
pos += bytes_read;
term_index.items.push(TermIndexItem {
field_id: *field,
part_id,
terms_len,
terms: bytes.get(pos..pos + item_len)?.to_vec(),
});
pos += item_len;
}
Some(term_index)
}
}
#[derive(Default)]
@ -690,7 +694,15 @@ pub struct TokenIndex {
}
impl Deserialize for TokenIndex {
fn deserialize(bytes: &[u8]) -> Option<Self> {
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
Self::from_bytes(bytes).ok_or_else(|| {
crate::Error::InternalError("Failed to deserialize token index.".to_string())
})
}
}
impl TokenIndex {
fn from_bytes(bytes: &[u8]) -> Option<Self> {
let (num_tokens, mut pos) = bytes.read_leb128::<u32>()?;
let mut tokens = Vec::with_capacity(num_tokens as usize);
for _ in 0..num_tokens {
@ -754,7 +766,7 @@ impl Deserialize for TokenIndex {
#[cfg(test)]
mod tests {
use ahash::{AHashMap, AHashSet};
use ahash::AHashMap;
use crate::{
fts::{
@ -932,17 +944,7 @@ mod tests {
}
let result = term_index
.match_terms(
&match_terms,
field_id.and_then(|f| {
let mut h = AHashSet::default();
h.insert(f);
Some(h)
}),
match_phrase,
true,
true,
)
.match_terms(&match_terms, field_id, match_phrase, true, true)
.unwrap()
.unwrap_or_default();

View file

@ -1,5 +1,7 @@
use utils::codec::leb128::Leb128Iterator;
use crate::{write::key::DeserializeBigEndian, Error, LogKey, Store};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Change {
Insert(u64),
@ -32,6 +34,73 @@ impl Default for Changes {
}
}
impl Store {
pub async fn changes(
&self,
account_id: u32,
collection: impl Into<u8>,
query: Query,
) -> crate::Result<Option<Changes>> {
let collection = collection.into();
let (is_inclusive, from_change_id, to_change_id) = match query {
Query::All => (true, 0, u64::MAX),
Query::Since(change_id) => (false, change_id, u64::MAX),
Query::SinceInclusive(change_id) => (true, change_id, u64::MAX),
Query::RangeInclusive(from_change_id, to_change_id) => {
(true, from_change_id, to_change_id)
}
};
let from_key = LogKey {
account_id,
collection,
change_id: from_change_id,
};
let to_key = LogKey {
account_id,
collection,
change_id: to_change_id,
};
let mut changelog = self
.iterate(
Changes::default(),
from_key,
to_key,
false,
true,
move |changelog, key, value| {
let change_id =
key.deserialize_be_u64(key.len() - std::mem::size_of::<u64>())?;
if !is_inclusive || change_id != from_change_id {
if changelog.changes.is_empty() {
changelog.from_change_id = change_id;
}
changelog.to_change_id = change_id;
changelog.deserialize(value).ok_or_else(|| {
Error::InternalError(format!(
"Failed to deserialize changelog for [{}/{:?}]: [{:?}]",
account_id, collection, query
))
})?;
}
Ok(true)
},
)
.await?;
if changelog.changes.is_empty() {
changelog.from_change_id = from_change_id;
changelog.to_change_id = if to_change_id != u64::MAX {
to_change_id
} else {
from_change_id
};
}
Ok(Some(changelog))
}
}
impl Changes {
pub fn deserialize(&mut self, bytes: &[u8]) -> Option<()> {
let mut bytes_it = bytes.iter();

View file

@ -60,8 +60,8 @@ pub enum Comparator {
#[derive(Debug)]
pub struct ResultSet {
account_id: u32,
collection: u8,
pub account_id: u32,
pub collection: u8,
pub results: RoaringBitmap,
}

View file

@ -9,7 +9,7 @@ use super::{Comparator, ResultSet, SortedResultSet};
pub struct Pagination {
requested_position: i32,
position: i32,
limit: usize,
pub limit: usize,
anchor: u32,
anchor_offset: i32,
has_anchor: bool,
@ -228,14 +228,7 @@ impl Store {
}
impl Pagination {
pub fn new(
limit: usize,
position: i32,
anchor: Option<u32>,
anchor_offset: i32,
prefix_key: Option<ValueKey>,
prefix_unique: bool,
) -> Self {
pub fn new(limit: usize, position: i32, anchor: Option<u32>, anchor_offset: i32) -> Self {
let (has_anchor, anchor) = anchor.map(|anchor| (true, anchor)).unwrap_or((false, 0));
Self {
@ -247,11 +240,21 @@ impl Pagination {
has_anchor,
anchor_found: false,
ids: Vec::with_capacity(limit),
prefix_key,
prefix_unique,
prefix_key: None,
prefix_unique: false,
}
}
pub fn with_prefix_key(mut self, prefix_key: ValueKey) -> Self {
self.prefix_key = Some(prefix_key);
self
}
pub fn with_prefix_unique(mut self, prefix_unique: bool) -> Self {
self.prefix_unique = prefix_unique;
self
}
pub fn add(&mut self, prefix_id: u32, document_id: u32) -> bool {
let id = ((prefix_id as u64) << 32) | document_id as u64;

View file

@ -13,6 +13,12 @@ pub enum AssertValue {
Hash(u64),
}
impl<T: Deserialize + Default> HashedValue<T> {
pub fn take(&mut self) -> T {
std::mem::take(&mut self.inner)
}
}
pub trait ToAssertValue {
fn to_assert_value(&self) -> AssertValue;
}

View file

@ -1,7 +1,9 @@
use std::convert::TryInto;
use utils::codec::leb128::Leb128_;
use crate::{AclKey, BitmapKey, IndexKey, IndexKeyPrefix, LogKey, Serialize, ValueKey};
use crate::{
AclKey, BitmapKey, IndexKey, IndexKeyPrefix, Key, LogKey, Serialize, ValueKey, SUBSPACE_LOGS,
};
pub struct KeySerializer {
buf: Vec<u8>,
@ -132,6 +134,16 @@ impl ValueKey {
}
}
pub fn term_index(account_id: u32, collection: impl Into<u8>, document_id: u32) -> Self {
ValueKey {
account_id,
collection: collection.into(),
document_id,
family: u8::MAX,
field: u8::MAX,
}
}
pub fn with_document_id(self, document_id: u32) -> Self {
Self {
document_id,
@ -274,3 +286,15 @@ impl Serialize for &LogKey {
.finalize()
}
}
impl Serialize for LogKey {
fn serialize(self) -> Vec<u8> {
(&self).serialize()
}
}
impl Key for LogKey {
fn subspace(&self) -> u8 {
SUBSPACE_LOGS
}
}

View file

@ -20,6 +20,13 @@ pub struct Changes {
}
impl ChangeLogBuilder {
pub fn new() -> ChangeLogBuilder {
ChangeLogBuilder {
change_id: u64::MAX,
changes: VecMap::default(),
}
}
pub fn with_change_id(change_id: u64) -> ChangeLogBuilder {
ChangeLogBuilder {
change_id,
@ -65,6 +72,26 @@ impl ChangeLogBuilder {
change.deletes.insert(old_jmap_id.into());
change.inserts.insert(new_jmap_id.into());
}
pub fn merge(&mut self, changes: ChangeLogBuilder) {
for (collection, other) in changes.changes {
let this = self.changes.get_mut_or_insert(collection);
for id in other.deletes {
if !this.inserts.remove(&id) {
this.deletes.insert(id);
}
this.updates.remove(&id);
this.child_updates.remove(&id);
}
this.inserts.extend(other.inserts);
this.updates.extend(other.updates);
this.child_updates.extend(other.child_updates);
}
}
pub fn is_empty(&self) -> bool {
self.changes.is_empty()
}
}
impl IntoOperations for ChangeLogBuilder {

View file

@ -101,6 +101,12 @@ impl Serialize for &str {
}
}
impl Serialize for &String {
fn serialize(self) -> Vec<u8> {
self.as_bytes().to_vec()
}
}
impl Serialize for String {
fn serialize(self) -> Vec<u8> {
self.into_bytes()

773
tests/src/jmap/mailbox.rs Normal file
View file

@ -0,0 +1,773 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::sync::Arc;
use jmap::JMAP;
use jmap_client::{
client::Client,
core::{
query::Filter,
set::{SetError, SetErrorType, SetObject, SetRequest},
},
mailbox::{self, Mailbox, Role},
Error, Set,
};
use jmap_proto::types::{id::Id, state::State};
use serde::{Deserialize, Serialize};
use store::ahash::AHashMap;
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Mailbox tests...");
// Create test mailboxes
let id_map = create_test_mailboxes(client).await;
// Sort by name
assert_eq!(
client
.mailbox_query(
None::<mailbox::query::Filter>,
[mailbox::query::Comparator::name()].into()
)
.await
.unwrap()
.ids()
.iter()
.map(|id| id_map.get(id).unwrap())
.collect::<Vec<_>>(),
[
"drafts",
"spam2",
"inbox",
"1",
"2",
"3",
"sent",
"spam",
"1.1",
"1.2",
"trash",
"spam1",
"1.1.1.1",
"1.1.1.1.1",
"1.1.1",
"1.2.1"
]
);
// Sort by name as tree
let mut request = client.build();
request
.query_mailbox()
.sort([mailbox::query::Comparator::name()])
.arguments()
.sort_as_tree(true);
assert_eq!(
request
.send_query_mailbox()
.await
.unwrap()
.ids()
.iter()
.map(|id| id_map.get(id).unwrap())
.collect::<Vec<_>>(),
[
"drafts",
"inbox",
"1",
"1.1",
"1.1.1",
"1.1.1.1",
"1.1.1.1.1",
"1.2",
"1.2.1",
"2",
"3",
"sent",
"spam",
"spam1",
"spam2",
"trash"
]
);
// Sort as tree with filters
let mut request = client.build();
request
.query_mailbox()
.filter(mailbox::query::Filter::name("level"))
.sort([mailbox::query::Comparator::name()])
.arguments()
.sort_as_tree(true);
assert_eq!(
request
.send_query_mailbox()
.await
.unwrap()
.ids()
.iter()
.map(|id| id_map.get(id).unwrap())
.collect::<Vec<_>>(),
[
"1",
"1.1",
"1.1.1",
"1.1.1.1",
"1.1.1.1.1",
"1.2",
"1.2.1",
"2",
"3"
]
);
// Filter as tree
let mut request = client.build();
request
.query_mailbox()
.filter(mailbox::query::Filter::name("spam"))
.sort([mailbox::query::Comparator::name()])
.arguments()
.filter_as_tree(true)
.sort_as_tree(true);
assert_eq!(
request
.send_query_mailbox()
.await
.unwrap()
.ids()
.iter()
.map(|id| id_map.get(id).unwrap())
.collect::<Vec<_>>(),
["spam", "spam1", "spam2"]
);
let mut request = client.build();
request
.query_mailbox()
.filter(mailbox::query::Filter::name("level"))
.sort([mailbox::query::Comparator::name()])
.arguments()
.filter_as_tree(true)
.sort_as_tree(true);
assert_eq!(
request.send_query_mailbox().await.unwrap().ids(),
Vec::<&str>::new()
);
// Filter by role
assert_eq!(
client
.mailbox_query(
mailbox::query::Filter::role(Role::Inbox).into(),
[mailbox::query::Comparator::name()].into()
)
.await
.unwrap()
.ids()
.iter()
.map(|id| id_map.get(id).unwrap())
.collect::<Vec<_>>(),
["inbox"]
);
assert_eq!(
client
.mailbox_query(
mailbox::query::Filter::has_any_role(true).into(),
[mailbox::query::Comparator::name()].into()
)
.await
.unwrap()
.ids()
.iter()
.map(|id| id_map.get(id).unwrap())
.collect::<Vec<_>>(),
["drafts", "inbox", "sent", "spam", "trash"]
);
// Duplicate role
let mut request = client.build();
request
.set_mailbox()
.update(&id_map["sent"])
.role(Role::Inbox);
assert!(matches!(
request
.send_set_mailbox()
.await
.unwrap()
.updated(&id_map["sent"]),
Err(Error::Set(SetError {
type_: SetErrorType::InvalidProperties,
..
}))
));
// Duplicate name
let mut request = client.build();
request.set_mailbox().update(&id_map["2"]).name("Level 3");
assert!(matches!(
request
.send_set_mailbox()
.await
.unwrap()
.updated(&id_map["2"]),
Err(Error::Set(SetError {
type_: SetErrorType::InvalidProperties,
..
}))
));
// Circular relationship
let mut request = client.build();
request
.set_mailbox()
.update(&id_map["1"])
.parent_id((&id_map["1.1.1.1.1"]).into());
assert!(matches!(
request
.send_set_mailbox()
.await
.unwrap()
.updated(&id_map["1"]),
Err(Error::Set(SetError {
type_: SetErrorType::InvalidProperties,
..
}))
));
let mut request = client.build();
request
.set_mailbox()
.update(&id_map["1"])
.parent_id((&id_map["1"]).into());
assert!(matches!(
request
.send_set_mailbox()
.await
.unwrap()
.updated(&id_map["1"]),
Err(Error::Set(SetError {
type_: SetErrorType::InvalidProperties,
..
}))
));
// Invalid parentId
let mut request = client.build();
request
.set_mailbox()
.update(&id_map["1"])
.parent_id(Id::new(u64::MAX).to_string().into());
assert!(matches!(
request
.send_set_mailbox()
.await
.unwrap()
.updated(&id_map["1"]),
Err(Error::Set(SetError {
type_: SetErrorType::InvalidProperties,
..
}))
));
// Obtain state
let state = client
.mailbox_changes(State::Initial.to_string(), 0)
.await
.unwrap()
.new_state()
.to_string();
// Rename and move mailbox
let mut request = client.build();
request
.set_mailbox()
.update(&id_map["1.1.1.1.1"])
.name("Renamed and moved")
.parent_id((&id_map["2"]).into());
assert!(request
.send_set_mailbox()
.await
.unwrap()
.updated(&id_map["1.1.1.1.1"])
.is_ok());
// Verify changes
let state = client.mailbox_changes(state, 0).await.unwrap();
assert_eq!(state.created().len(), 0);
assert_eq!(state.updated().len(), 1);
assert_eq!(state.destroyed().len(), 0);
assert_eq!(state.arguments().updated_properties(), None);
let state = state.new_state().to_string();
// Insert email into Inbox
let mail_id = client
.email_import(
b"From: test@test.com\nSubject: hey\n\ntest".to_vec(),
[&id_map["inbox"]],
None::<Vec<&str>>,
None,
)
.await
.unwrap()
.take_id();
// Inbox's total and unread count should have increased
let inbox = client
.mailbox_get(
&id_map["inbox"],
[
mailbox::Property::TotalEmails,
mailbox::Property::UnreadEmails,
mailbox::Property::TotalThreads,
mailbox::Property::UnreadThreads,
]
.into(),
)
.await
.unwrap()
.unwrap();
assert_eq!(inbox.total_emails(), 1);
assert_eq!(inbox.unread_emails(), 1);
assert_eq!(inbox.total_threads(), 1);
assert_eq!(inbox.unread_threads(), 1);
// Set email to read and fetch properties again
client
.email_set_keyword(&mail_id, "$seen", true)
.await
.unwrap();
let inbox = client
.mailbox_get(
&id_map["inbox"],
[
mailbox::Property::TotalEmails,
mailbox::Property::UnreadEmails,
mailbox::Property::TotalThreads,
mailbox::Property::UnreadThreads,
]
.into(),
)
.await
.unwrap()
.unwrap();
assert_eq!(inbox.total_emails(), 1);
assert_eq!(inbox.unread_emails(), 0);
assert_eq!(inbox.total_threads(), 1);
assert_eq!(inbox.unread_threads(), 0);
// Only email properties must have changed
let prev_state = state.clone();
let state = client.mailbox_changes(state, 0).await.unwrap();
assert_eq!(state.created().len(), 0);
assert_eq!(
state
.updated()
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>(),
&[&id_map["inbox"]]
);
assert_eq!(state.destroyed().len(), 0);
assert_eq!(
state.arguments().updated_properties(),
Some(
&[
mailbox::Property::TotalEmails,
mailbox::Property::UnreadEmails,
mailbox::Property::TotalThreads,
mailbox::Property::UnreadThreads,
][..]
)
);
let state = state.new_state().to_string();
// Use updatedProperties in a query
let mut request = client.build();
let changes_request = request.changes_mailbox(prev_state).max_changes(0);
let properties_ref = changes_request.updated_properties_reference();
let updated_ref = changes_request.updated_reference();
request
.get_mailbox()
.ids_ref(updated_ref)
.properties_ref(properties_ref);
let mut changed_mailboxes = request
.send()
.await
.unwrap()
.unwrap_method_responses()
.pop()
.unwrap()
.unwrap_get_mailbox()
.unwrap()
.take_list();
assert_eq!(changed_mailboxes.len(), 1);
let inbox = changed_mailboxes.pop().unwrap();
assert_eq!(inbox.id().unwrap(), &id_map["inbox"]);
assert_eq!(inbox.total_emails(), 1);
assert_eq!(inbox.unread_emails(), 0);
assert_eq!(inbox.total_threads(), 1);
assert_eq!(inbox.unread_threads(), 0);
assert_eq!(inbox.name(), None);
assert_eq!(inbox.my_rights(), None);
// Move email from Inbox to Trash
client
.email_set_mailboxes(&mail_id, [&id_map["trash"]])
.await
.unwrap();
// E-mail properties of both Inbox and Trash must have changed
let state = client.mailbox_changes(state, 0).await.unwrap();
assert_eq!(state.created().len(), 0);
assert_eq!(state.updated().len(), 2);
assert_eq!(state.destroyed().len(), 0);
let mut folder_ids = vec![&id_map["trash"], &id_map["inbox"]];
let mut updated_ids = state
.updated()
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>();
updated_ids.sort_unstable();
folder_ids.sort_unstable();
assert_eq!(updated_ids, folder_ids);
assert_eq!(
state.arguments().updated_properties(),
Some(
&[
mailbox::Property::TotalEmails,
mailbox::Property::UnreadEmails,
mailbox::Property::TotalThreads,
mailbox::Property::UnreadThreads,
][..]
)
);
// Deleting folders with children is not allowed
let mut request = client.build();
request.set_mailbox().destroy([&id_map["1"]]);
assert!(matches!(
request
.send_set_mailbox()
.await
.unwrap()
.destroyed(&id_map["1"]),
Err(Error::Set(SetError {
type_: SetErrorType::MailboxHasChild,
..
}))
));
// Deleting folders with contents is not allowed (unless remove_emails is true)
let mut request = client.build();
request.set_mailbox().destroy([&id_map["trash"]]);
assert!(matches!(
request
.send_set_mailbox()
.await
.unwrap()
.destroyed(&id_map["trash"]),
Err(Error::Set(SetError {
type_: SetErrorType::MailboxHasEmail,
..
}))
));
// Delete Trash folder and its contents
let mut request = client.build();
request
.set_mailbox()
.destroy([&id_map["trash"]])
.arguments()
.on_destroy_remove_emails(true);
assert!(request
.send_set_mailbox()
.await
.unwrap()
.destroyed(&id_map["trash"])
.is_ok());
// Verify that Trash folder and its contents are gone
assert!(client
.mailbox_get(&id_map["trash"], None::<Vec<_>>)
.await
.unwrap()
.is_none());
assert!(client
.email_get(&mail_id, None::<Vec<_>>)
.await
.unwrap()
.is_none());
// Check search results after changing folder properties
let mut request = client.build();
request
.set_mailbox()
.update(&id_map["drafts"])
.name("Borradores")
.sort_order(100)
.parent_id((&id_map["2"]).into())
.role(Role::None);
assert!(request
.send_set_mailbox()
.await
.unwrap()
.updated(&id_map["drafts"])
.is_ok());
assert_eq!(
client
.mailbox_query(
Filter::and([
mailbox::query::Filter::name("Borradores").into(),
mailbox::query::Filter::parent_id((&id_map["2"]).into()).into(),
Filter::not([mailbox::query::Filter::has_any_role(true)])
])
.into(),
[mailbox::query::Comparator::name()].into()
)
.await
.unwrap()
.ids()
.iter()
.map(|id| id_map.get(id).unwrap())
.collect::<Vec<_>>(),
["drafts"]
);
assert!(client
.mailbox_query(
mailbox::query::Filter::name("Drafts").into(),
[mailbox::query::Comparator::name()].into()
)
.await
.unwrap()
.ids()
.is_empty());
assert!(client
.mailbox_query(
mailbox::query::Filter::role(Role::Drafts).into(),
[mailbox::query::Comparator::name()].into()
)
.await
.unwrap()
.ids()
.is_empty());
assert_eq!(
client
.mailbox_query(
mailbox::query::Filter::parent_id(None::<&str>).into(),
[mailbox::query::Comparator::name()].into()
)
.await
.unwrap()
.ids()
.iter()
.map(|id| id_map.get(id).unwrap())
.collect::<Vec<_>>(),
["inbox", "sent", "spam"]
);
assert_eq!(
client
.mailbox_query(
mailbox::query::Filter::has_any_role(true).into(),
[mailbox::query::Comparator::name()].into()
)
.await
.unwrap()
.ids()
.iter()
.map(|id| id_map.get(id).unwrap())
.collect::<Vec<_>>(),
["inbox", "sent", "spam"]
);
let mut request = client.build();
request.query_mailbox().arguments().sort_as_tree(true);
let mut ids = request.send_query_mailbox().await.unwrap().take_ids();
ids.reverse();
for id in ids {
client.mailbox_destroy(&id, true).await.unwrap();
}
let todo = "yes";
//server.store.assert_is_empty();
}
async fn create_test_mailboxes(client: &mut Client) -> AHashMap<String, String> {
let mut mailbox_map = AHashMap::default();
let mut request = client.build();
build_create_query(
request.set_mailbox(),
&mut mailbox_map,
serde_json::from_slice(TEST_MAILBOXES).unwrap(),
None,
);
let mut result = request.send_set_mailbox().await.unwrap();
let mut id_map = AHashMap::with_capacity(mailbox_map.len());
for (create_id, local_id) in mailbox_map {
let server_id = result.created(&create_id).unwrap().take_id();
id_map.insert(local_id.clone(), server_id.clone());
id_map.insert(server_id, local_id);
}
id_map
}
fn build_create_query(
request: &mut SetRequest<Mailbox<Set>>,
mailbox_map: &mut AHashMap<String, String>,
mailboxes: Vec<TestMailbox>,
parent_id: Option<String>,
) {
for mailbox in mailboxes {
let create_mailbox = request
.create()
.name(mailbox.name)
.sort_order(mailbox.order);
if let Some(role) = mailbox.role {
create_mailbox.role(role);
}
if let Some(parent_id) = &parent_id {
create_mailbox.parent_id_ref(parent_id);
}
let create_mailbox_id = create_mailbox.create_id().unwrap();
mailbox_map.insert(create_mailbox_id.clone(), mailbox.id);
if let Some(children) = mailbox.children {
build_create_query(request, mailbox_map, children, create_mailbox_id.into());
}
}
}
#[derive(Serialize, Deserialize)]
struct TestMailbox {
id: String,
name: String,
role: Option<Role>,
order: u32,
children: Option<Vec<TestMailbox>>,
}
const TEST_MAILBOXES: &[u8] = br#"
[
{
"id": "inbox",
"name": "Inbox",
"role": "INBOX",
"order": 5,
"children": [
{
"name": "Level 1",
"id": "1",
"order": 4,
"children": [
{
"name": "Sub-Level 1.1",
"id": "1.1",
"order": 3,
"children": [
{
"name": "Z-Sub-Level 1.1.1",
"id": "1.1.1",
"order": 2,
"children": [
{
"name": "X-Sub-Level 1.1.1.1",
"id": "1.1.1.1",
"order": 1,
"children": [
{
"name": "Y-Sub-Level 1.1.1.1.1",
"id": "1.1.1.1.1",
"order": 0
}
]
}
]
}
]
},
{
"name": "Sub-Level 1.2",
"id": "1.2",
"order": 7,
"children": [
{
"name": "Z-Sub-Level 1.2.1",
"id": "1.2.1",
"order": 6
}
]
}
]
},
{
"name": "Level 2",
"id": "2",
"order": 8
},
{
"name": "Level 3",
"id": "3",
"order": 9
}
]
},
{
"id": "sent",
"name": "Sent",
"role": "SENT",
"order": 15
},
{
"id": "drafts",
"name": "Drafts",
"role": "DRAFTS",
"order": 14
},
{
"id": "trash",
"name": "Trash",
"role": "TRASH",
"order": 13
},
{
"id": "spam",
"name": "Spam",
"role": "JUNK",
"order": 12,
"children": [{
"id": "spam1",
"name": "Work Spam",
"order": 11,
"children": [{
"id": "spam2",
"name": "Friendly Spam",
"order": 10
}]
}]
}
]
"#;

View file

@ -10,6 +10,7 @@ use crate::{add_test_certs, store::TempDir};
pub mod email_get;
pub mod email_query;
pub mod email_set;
pub mod mailbox;
pub mod thread_get;
pub mod thread_merge;
@ -51,10 +52,11 @@ pub async fn jmap_tests() {
let delete = true;
let mut params = init_jmap_tests(delete).await;
//email_get::test(params.server.clone(), &mut params.client).await;
email_set::test(params.server.clone(), &mut params.client).await;
//email_set::test(params.server.clone(), &mut params.client).await;
//email_query::test(params.server.clone(), &mut params.client, delete).await;
//thread_get::test(params.server.clone(), &mut params.client).await;
//thread_merge::test(params.server.clone(), &mut params.client).await;
mailbox::test(params.server.clone(), &mut params.client).await;
if delete {
params.temp_dir.delete();
}

View file

@ -327,7 +327,7 @@ pub async fn test_filter(db: Arc<Store>) {
.sort(
docset,
vec![Comparator::ascending(fields["accession_number"])],
Pagination::new(0, 0, None, 0, None, false),
Pagination::new(0, 0, None, 0),
)
.await
.unwrap();
@ -423,7 +423,7 @@ pub async fn test_sort(db: Arc<Store>) {
.sort(
docset,
sort,
Pagination::new(expected_results.len(), 0, None, 0, None, false),
Pagination::new(expected_results.len(), 0, None, 0),
)
.await
.unwrap();