Updated import/export to use latest schema

This commit is contained in:
mdecimus 2024-05-03 11:16:51 +01:00
parent 17d203c928
commit afe63c3e9d
14 changed files with 416 additions and 353 deletions

View file

@ -36,8 +36,8 @@ use store::{
key::DeserializeBigEndian, AnyKey, BitmapClass, BitmapHash, BlobOp, DirectoryClass,
LookupClass, QueueClass, QueueEvent, TagValue, ValueClass,
},
BitmapKey, Deserialize, IndexKey, IterateParams, LogKey, Serialize, ValueKey, SUBSPACE_BITMAPS,
U32_LEN, U64_LEN,
BitmapKey, Deserialize, IndexKey, IterateParams, LogKey, Serialize, ValueKey,
SUBSPACE_BITMAP_ID, SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, U32_LEN, U64_LEN,
};
use utils::{
@ -47,7 +47,6 @@ use utils::{
use crate::Core;
const KEY_OFFSET: usize = 1;
pub(super) const MAGIC_MARKER: u8 = 123;
pub(super) const FILE_VERSION: u8 = 1;
@ -141,10 +140,10 @@ impl Core {
)
.no_values(),
|key, _| {
let account_id = key.deserialize_be_u32(KEY_OFFSET)?;
let collection = key.deserialize_u8(KEY_OFFSET + U32_LEN)?;
let field = key.deserialize_u8(KEY_OFFSET + U32_LEN + 1)?;
let document_id = key.deserialize_be_u32(KEY_OFFSET + U32_LEN + 2)?;
let account_id = key.deserialize_be_u32(0)?;
let collection = key.deserialize_u8(U32_LEN)?;
let field = key.deserialize_u8(U32_LEN + 1)?;
let document_id = key.deserialize_be_u32(U32_LEN + 2)?;
keys.insert((account_id, collection, document_id, field));
@ -253,11 +252,10 @@ impl Core {
)
.no_values(),
|key, _| {
let account_id = key.deserialize_be_u32(KEY_OFFSET)?;
let collection = key.deserialize_u8(KEY_OFFSET + U32_LEN)?;
let document_id = key
.range(KEY_OFFSET + U32_LEN + 1..usize::MAX)?
.deserialize_leb128()?;
let account_id = key.deserialize_be_u32(0)?;
let collection = key.deserialize_u8(U32_LEN)?;
let document_id =
key.range(U32_LEN + 1..usize::MAX)?.deserialize_leb128()?;
keys.insert((account_id, collection, document_id));
@ -340,11 +338,10 @@ impl Core {
},
),
|key, value| {
let grant_account_id = key.deserialize_be_u32(KEY_OFFSET)?;
let account_id = key.deserialize_be_u32(KEY_OFFSET + U32_LEN)?;
let collection = key.deserialize_u8(KEY_OFFSET + (U32_LEN * 2))?;
let document_id =
key.deserialize_be_u32(KEY_OFFSET + (U32_LEN * 2) + 1)?;
let grant_account_id = key.deserialize_be_u32(0)?;
let account_id = key.deserialize_be_u32(U32_LEN)?;
let collection = key.deserialize_u8(U32_LEN * 2)?;
let document_id = key.deserialize_be_u32((U32_LEN * 2) + 1)?;
if account_id != last_account_id {
writer
@ -417,13 +414,12 @@ impl Core {
},
),
|key, _| {
let account_id = key.deserialize_be_u32(KEY_OFFSET + BLOB_HASH_LEN)?;
let collection =
key.deserialize_u8(KEY_OFFSET + BLOB_HASH_LEN + U32_LEN)?;
let account_id = key.deserialize_be_u32(BLOB_HASH_LEN)?;
let collection = key.deserialize_u8(BLOB_HASH_LEN + U32_LEN)?;
let document_id =
key.deserialize_be_u32(KEY_OFFSET + BLOB_HASH_LEN + U32_LEN + 1)?;
key.deserialize_be_u32(BLOB_HASH_LEN + U32_LEN + 1)?;
let hash = key.range(KEY_OFFSET..KEY_OFFSET + BLOB_HASH_LEN)?.to_vec();
let hash = key.range(0..BLOB_HASH_LEN)?.to_vec();
if account_id != u32::MAX && document_id != u32::MAX {
writer
@ -510,10 +506,7 @@ impl Core {
),
|key, value| {
writer
.send(Op::KeyValue((
key.range(KEY_OFFSET..usize::MAX)?.to_vec(),
value.to_vec(),
)))
.send(Op::KeyValue((key.to_vec(), value.to_vec())))
.failed("Failed to send key value");
Ok(true)
@ -560,10 +553,7 @@ impl Core {
),
|key, value| {
writer
.send(Op::KeyValue((
key.range(KEY_OFFSET..usize::MAX)?.to_vec(),
value.to_vec(),
)))
.send(Op::KeyValue((key.to_vec(), value.to_vec())))
.failed("Failed to send key value");
Ok(true)
@ -576,41 +566,6 @@ impl Core {
.send(Op::Family(Family::LookupCounter))
.failed("Failed to send family");
let mut expired_counters = AHashSet::new();
store
.iterate(
IterateParams::new(
ValueKey {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::Lookup(LookupClass::CounterExpiry(vec![0])),
},
ValueKey {
account_id: u32::MAX,
collection: u8::MAX,
document_id: u32::MAX,
class: ValueClass::Lookup(LookupClass::CounterExpiry(vec![
u8::MAX,
u8::MAX,
u8::MAX,
u8::MAX,
u8::MAX,
u8::MAX,
])),
},
)
.no_values(),
|key, _| {
expired_counters.insert(key.range(KEY_OFFSET..usize::MAX)?.to_vec());
Ok(true)
},
)
.await
.failed("Failed to iterate over data store");
let mut counters = Vec::new();
store
@ -638,9 +593,11 @@ impl Core {
)
.no_values(),
|key, _| {
let key = key.range(KEY_OFFSET..usize::MAX)?.to_vec();
if !expired_counters.contains(&key) {
counters.push(key);
if (key.len() != (U32_LEN * 2) + 2)
|| key[U32_LEN + 1] != 84
|| key[U32_LEN] != 1
{
counters.push(key.to_vec());
}
Ok(true)
@ -699,15 +656,12 @@ impl Core {
},
),
|key, value| {
let mut key = key.to_vec();
key[0] -= 20;
if key[0] == 2 {
principal_ids.push(key.as_slice().range(1..usize::MAX)?.to_vec());
principal_ids.push(key.range(1..usize::MAX)?.to_vec());
}
writer
.send(Op::KeyValue((key, value.to_vec())))
.send(Op::KeyValue((key.to_vec(), value.to_vec())))
.failed("Failed to send key value");
Ok(true)
@ -761,6 +715,40 @@ impl Core {
document_id: 0,
class: ValueClass::Queue(QueueClass::Message(0)),
},
ValueKey {
account_id: u32::MAX,
collection: u8::MAX,
document_id: u32::MAX,
class: ValueClass::Queue(QueueClass::Message(u64::MAX)),
},
),
|key_, value| {
let mut key = Vec::with_capacity(U64_LEN + 1);
key.push(0);
key.extend_from_slice(key_);
writer
.send(Op::KeyValue((key, value.to_vec())))
.failed("Failed to send key value");
Ok(true)
},
)
.await
.failed("Failed to iterate over data store");
store
.iterate(
IterateParams::new(
ValueKey {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: 0,
queue_id: 0,
})),
},
ValueKey {
account_id: u32::MAX,
collection: u8::MAX,
@ -771,9 +759,10 @@ impl Core {
})),
},
),
|key, value| {
let mut key = key.to_vec();
key[0] -= 50;
|key_, value| {
let mut key = Vec::with_capacity(U64_LEN + 1);
key.push(1);
key.extend_from_slice(key_);
writer
.send(Op::KeyValue((key, value.to_vec())))
@ -861,88 +850,101 @@ impl Core {
fn backup_bitmaps(&self, dest: &Path) -> TaskHandle {
let store = self.storage.data.clone();
let has_doc_id = store.id() != "rocksdb";
let (handle, writer) = spawn_writer(dest.join("bitmap"));
(
tokio::spawn(async move {
const BM_DOCUMENT_IDS: u8 = 0;
const BM_TEXT: u8 = 1 << 7;
const TAG_ID: u8 = 1 << 6;
const TAG_TEXT: u8 = 1 << 0 | 1 << 6;
const TAG_STATIC: u8 = 1 << 1 | 1 << 6;
const BM_MARKER: u8 = 1 << 7;
writer
.send(Op::Family(Family::Bitmap))
.failed("Failed to send family");
let mut bitmaps: AHashMap<(u32, u8), AHashSet<BitmapClass>> = AHashMap::new();
let mut bitmaps: AHashMap<(u32, u8), AHashSet<BitmapClass<u32>>> = AHashMap::new();
for subspace in [
SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG,
SUBSPACE_BITMAP_TEXT,
] {
store
.iterate(
IterateParams::new(
AnyKey {
subspace: SUBSPACE_BITMAPS,
subspace,
key: vec![0u8],
},
AnyKey {
subspace: SUBSPACE_BITMAPS,
subspace,
key: vec![u8::MAX; 10],
},
)
.no_values(),
|key, _| {
let account_id = key.deserialize_be_u32(0)?;
let key = key.range(0..key.len() - U32_LEN)?;
match subspace {
SUBSPACE_BITMAP_ID => {
let collection = key.deserialize_u8(U32_LEN)?;
let entry = bitmaps.entry((account_id, collection)).or_default();
let key = if has_doc_id {
key.range(0..key.len() - U32_LEN)?
} else {
key
bitmaps
.entry((account_id, collection))
.or_default()
.insert(BitmapClass::DocumentIds);
}
SUBSPACE_BITMAP_TAG => {
let collection = key.deserialize_u8(U32_LEN)?;
let value = key.range(U32_LEN + 2..usize::MAX)?;
let (field, value) = match key
.deserialize_u8(U32_LEN + 1)?
{
field if field & BM_MARKER == 0 => {
(field, TagValue::Id(value.deserialize_leb128()?))
}
field => {
(field & !BM_MARKER, TagValue::Text(value.to_vec()))
}
};
match key.deserialize_u8(U32_LEN + 1)? {
BM_DOCUMENT_IDS => {
entry.insert(BitmapClass::DocumentIds);
bitmaps
.entry((account_id, collection))
.or_default()
.insert(BitmapClass::Tag { field, value });
}
TAG_ID => {
entry.insert(BitmapClass::Tag {
field: key.deserialize_u8(U32_LEN + 2)?,
value: TagValue::Id(
key.range(U32_LEN + 3..usize::MAX)?
.deserialize_leb128()?,
),
});
}
TAG_TEXT => {
entry.insert(BitmapClass::Tag {
field: key.deserialize_u8(U32_LEN + 2)?,
value: TagValue::Text(
key.range(U32_LEN + 3..usize::MAX)?.to_vec(),
),
});
}
TAG_STATIC => {
entry.insert(BitmapClass::Tag {
field: key.deserialize_u8(U32_LEN + 2)?,
value: TagValue::Static(key.deserialize_u8(U32_LEN + 3)?),
});
}
text => {
entry.insert(BitmapClass::Text {
field: key.deserialize_u8(U32_LEN + 2)?,
token: BitmapHash {
hash: key
.range(U32_LEN + 3..U32_LEN + 11)?
.try_into()
.unwrap(),
len: text & !BM_TEXT,
},
SUBSPACE_BITMAP_TEXT => {
let collection = key.deserialize_u8(key.len() - 2)?;
let mut hash = [0u8; 8];
let (hash, len) = match key.len() - U32_LEN - 2 {
9 => {
hash[..8].copy_from_slice(
key.range(U32_LEN..key.len() - 3)?,
);
(hash, key.deserialize_u8(key.len() - 3)?)
}
len @ (1..=7) => {
hash[..len].copy_from_slice(
key.range(U32_LEN..key.len() - 2)?,
);
(hash, len as u8)
}
invalid => {
return Err(format!(
"Invalid text bitmap key length {invalid}"
)
.into())
}
};
bitmaps
.entry((account_id, collection))
.or_default()
.insert(BitmapClass::Text {
field: key.deserialize_u8(key.len() - 1)?,
token: BitmapHash { hash, len },
});
}
_ => unreachable!(),
}
Ok(true)
@ -950,6 +952,7 @@ impl Core {
)
.await
.failed("Failed to iterate over data store");
}
for ((account_id, collection), classes) in bitmaps {
writer
@ -965,7 +968,7 @@ impl Core {
account_id,
collection,
class: class.clone(),
block_num: 0,
document_id: 0,
})
.await
.failed("Failed to get bitmap")
@ -988,11 +991,6 @@ impl Core {
key.push(field);
key.extend_from_slice(&text);
}
TagValue::Static(id) => {
key.push(3u8);
key.push(field);
key.push(id);
}
}
key

View file

@ -32,7 +32,7 @@ use store::{
roaring::RoaringBitmap,
write::{
key::DeserializeBigEndian, BatchBuilder, BitmapClass, BitmapHash, BlobOp, DirectoryClass,
LookupClass, Operation, TagValue, ValueClass,
LookupClass, MaybeDynamicId, MaybeDynamicValue, Operation, TagValue, ValueClass,
},
BlobStore, Store, U32_LEN,
};
@ -159,7 +159,8 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
}
Family::Directory => {
let key = key.as_slice();
let class = match key.first().expect("Failed to read directory key type") {
let class: DirectoryClass<MaybeDynamicId> =
match key.first().expect("Failed to read directory key type") {
0 => DirectoryClass::NameToId(
key.get(1..)
.expect("Failed to read directory string")
@ -170,12 +171,12 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
.expect("Failed to read directory string")
.to_vec(),
),
2 => DirectoryClass::Principal(
2 => DirectoryClass::Principal(MaybeDynamicId::Static(
key.get(1..)
.expect("Failed to read range for principal id")
.deserialize_leb128()
.deserialize_leb128::<u32>()
.expect("Failed to deserialize principal id"),
),
)),
3 => DirectoryClass::Domain(
key.get(1..)
.expect("Failed to read directory string")
@ -195,20 +196,24 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
continue;
}
5 => DirectoryClass::MemberOf {
principal_id: key
.deserialize_be_u32(1)
principal_id: MaybeDynamicId::Static(
key.deserialize_be_u32(1)
.expect("Failed to read principal id"),
member_of: key
.deserialize_be_u32(1 + U32_LEN)
),
member_of: MaybeDynamicId::Static(
key.deserialize_be_u32(1 + U32_LEN)
.expect("Failed to read principal id"),
),
},
6 => DirectoryClass::Members {
principal_id: key
.deserialize_be_u32(1)
principal_id: MaybeDynamicId::Static(
key.deserialize_be_u32(1)
.expect("Failed to read principal id"),
has_member: key
.deserialize_be_u32(1 + U32_LEN)
),
has_member: MaybeDynamicId::Static(
key.deserialize_be_u32(1 + U32_LEN)
.expect("Failed to read principal id"),
),
},
_ => failed("Invalid directory key"),
@ -253,13 +258,14 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
let document_ids = RoaringBitmap::deserialize_from(&value[..])
.expect("Failed to deserialize bitmap");
let key = key.as_slice();
let class = match key.first().expect("Failed to read bitmap class") {
let class: BitmapClass<MaybeDynamicId> =
match key.first().expect("Failed to read bitmap class") {
0 => BitmapClass::DocumentIds,
1 => BitmapClass::Tag {
field: key.get(1).copied().expect("Failed to read field"),
value: TagValue::Id(
value: TagValue::Id(MaybeDynamicId::Static(
key.deserialize_be_u32(2).expect("Failed to read tag id"),
),
)),
},
2 => BitmapClass::Tag {
field: key.get(1).copied().expect("Failed to read field"),
@ -269,9 +275,12 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
},
3 => BitmapClass::Tag {
field: key.get(1).copied().expect("Failed to read field"),
value: TagValue::Static(
key.get(2).copied().expect("Failed to read tag static id"),
),
value: TagValue::Id(MaybeDynamicId::Static(
key.get(2)
.copied()
.expect("Failed to read tag static id")
.into(),
)),
},
4 => BitmapClass::Text {
field: key.get(1).copied().expect("Failed to read field"),
@ -307,13 +316,14 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
}
}
Family::Log => {
batch.ops.push(Operation::Log {
batch.ops.push(Operation::ChangeId {
change_id: key
.as_slice()
.deserialize_be_u64(0)
.expect("Failed to deserialize change id"),
collection,
set: value,
});
batch.ops.push(Operation::Log {
set: MaybeDynamicValue::Static(value),
});
}
Family::None => failed("No family specified in file"),

View file

@ -26,7 +26,7 @@ use std::{borrow::Cow, collections::HashSet};
use store::{
write::{
assert::HashedValue, BatchBuilder, BitmapClass, BitmapHash, IntoOperations, Operation,
TagValue, TokenizeText, ValueClass, ValueOp,
TokenizeText, ValueClass, ValueOp,
},
Serialize,
};
@ -634,12 +634,3 @@ impl<T> From<Property> for ValueClass<T> {
ValueClass::Property(value.into())
}
}
impl<T> From<Property> for BitmapClass<T> {
fn from(value: Property) -> Self {
BitmapClass::Tag {
field: value.into(),
value: TagValue::Static(0),
}
}
}

View file

@ -24,7 +24,9 @@
use std::fmt::Display;
use store::{
write::{BitmapClass, DeserializeFrom, Operation, SerializeInto, TagValue, ToBitmaps},
write::{
BitmapClass, DeserializeFrom, MaybeDynamicId, Operation, SerializeInto, TagValue, ToBitmaps,
},
Serialize,
};
use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
@ -285,42 +287,76 @@ impl DeserializeFrom for Keyword {
}
}
impl<T> From<Keyword> for TagValue<T> {
fn from(value: Keyword) -> Self {
match value {
Keyword::Seen => TagValue::Static(SEEN as u8),
Keyword::Draft => TagValue::Static(DRAFT as u8),
Keyword::Flagged => TagValue::Static(FLAGGED as u8),
Keyword::Answered => TagValue::Static(ANSWERED as u8),
Keyword::Recent => TagValue::Static(RECENT as u8),
Keyword::Important => TagValue::Static(IMPORTANT as u8),
Keyword::Phishing => TagValue::Static(PHISHING as u8),
Keyword::Junk => TagValue::Static(JUNK as u8),
Keyword::NotJunk => TagValue::Static(NOTJUNK as u8),
Keyword::Deleted => TagValue::Static(DELETED as u8),
Keyword::Forwarded => TagValue::Static(FORWARDED as u8),
Keyword::MdnSent => TagValue::Static(MDN_SENT as u8),
Keyword::Other(string) => TagValue::Text(string.into_bytes()),
impl Keyword {
pub fn id(&self) -> Result<u32, String> {
match self {
Keyword::Seen => Ok(SEEN as u32),
Keyword::Draft => Ok(DRAFT as u32),
Keyword::Flagged => Ok(FLAGGED as u32),
Keyword::Answered => Ok(ANSWERED as u32),
Keyword::Recent => Ok(RECENT as u32),
Keyword::Important => Ok(IMPORTANT as u32),
Keyword::Phishing => Ok(PHISHING as u32),
Keyword::Junk => Ok(JUNK as u32),
Keyword::NotJunk => Ok(NOTJUNK as u32),
Keyword::Deleted => Ok(DELETED as u32),
Keyword::Forwarded => Ok(FORWARDED as u32),
Keyword::MdnSent => Ok(MDN_SENT as u32),
Keyword::Other(string) => Err(string.clone()),
}
}
pub fn into_id(self) -> Result<u32, String> {
match self {
Keyword::Seen => Ok(SEEN as u32),
Keyword::Draft => Ok(DRAFT as u32),
Keyword::Flagged => Ok(FLAGGED as u32),
Keyword::Answered => Ok(ANSWERED as u32),
Keyword::Recent => Ok(RECENT as u32),
Keyword::Important => Ok(IMPORTANT as u32),
Keyword::Phishing => Ok(PHISHING as u32),
Keyword::Junk => Ok(JUNK as u32),
Keyword::NotJunk => Ok(NOTJUNK as u32),
Keyword::Deleted => Ok(DELETED as u32),
Keyword::Forwarded => Ok(FORWARDED as u32),
Keyword::MdnSent => Ok(MDN_SENT as u32),
Keyword::Other(string) => Err(string),
}
}
}
impl<T> From<&Keyword> for TagValue<T> {
impl From<Keyword> for TagValue<u32> {
fn from(value: Keyword) -> Self {
match value.into_id() {
Ok(id) => TagValue::Id(id),
Err(string) => TagValue::Text(string.into_bytes()),
}
}
}
impl From<&Keyword> for TagValue<u32> {
fn from(value: &Keyword) -> Self {
match value {
Keyword::Seen => TagValue::Static(SEEN as u8),
Keyword::Draft => TagValue::Static(DRAFT as u8),
Keyword::Flagged => TagValue::Static(FLAGGED as u8),
Keyword::Answered => TagValue::Static(ANSWERED as u8),
Keyword::Recent => TagValue::Static(RECENT as u8),
Keyword::Important => TagValue::Static(IMPORTANT as u8),
Keyword::Phishing => TagValue::Static(PHISHING as u8),
Keyword::Junk => TagValue::Static(JUNK as u8),
Keyword::NotJunk => TagValue::Static(NOTJUNK as u8),
Keyword::Deleted => TagValue::Static(DELETED as u8),
Keyword::Forwarded => TagValue::Static(FORWARDED as u8),
Keyword::MdnSent => TagValue::Static(MDN_SENT as u8),
Keyword::Other(string) => TagValue::Text(string.as_bytes().to_vec()),
match value.id() {
Ok(id) => TagValue::Id(id),
Err(string) => TagValue::Text(string.into_bytes()),
}
}
}
impl From<Keyword> for TagValue<MaybeDynamicId> {
fn from(value: Keyword) -> Self {
match value.into_id() {
Ok(id) => TagValue::Id(MaybeDynamicId::Static(id)),
Err(string) => TagValue::Text(string.into_bytes()),
}
}
}
impl From<&Keyword> for TagValue<MaybeDynamicId> {
fn from(value: &Keyword) -> Self {
match value.id() {
Ok(id) => TagValue::Id(MaybeDynamicId::Static(id)),
Err(string) => TagValue::Text(string.into_bytes()),
}
}
}

View file

@ -58,6 +58,7 @@ impl FdbStore {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let mut change_id = u64::MAX;
let mut result = AssignedIds::default();
let trx = self.db.create_trx()?;
@ -79,6 +80,11 @@ impl FdbStore {
} => {
document_id = *document_id_;
}
Operation::ChangeId {
change_id: change_id_,
} => {
change_id = *change_id_;
}
Operation::Value { class, op } => {
let mut key = class.serialize(
account_id,
@ -235,7 +241,7 @@ impl FdbStore {
let key = LogKey {
account_id,
collection,
change_id: batch.change_id,
change_id,
}
.serialize(WITH_SUBSPACE);
trx.set(&key, set.resolve(&result)?.as_ref());

View file

@ -84,6 +84,7 @@ impl MysqlStore {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let mut change_id = u64::MAX;
let mut asserted_values = AHashMap::new();
let mut tx_opts = TxOpts::default();
tx_opts
@ -109,6 +110,11 @@ impl MysqlStore {
} => {
document_id = *document_id_;
}
Operation::ChangeId {
change_id: change_id_,
} => {
change_id = *change_id_;
}
Operation::Value { class, op } => {
let key =
class.serialize(account_id, collection, document_id, 0, (&result).into());
@ -290,7 +296,7 @@ impl MysqlStore {
let key = LogKey {
account_id,
collection,
change_id: batch.change_id,
change_id,
}
.serialize(0);

View file

@ -97,6 +97,7 @@ impl PostgresStore {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let mut change_id = u64::MAX;
let mut asserted_values = AHashMap::new();
let trx = conn
.build_transaction()
@ -122,6 +123,11 @@ impl PostgresStore {
} => {
document_id = *document_id_;
}
Operation::ChangeId {
change_id: change_id_,
} => {
change_id = *change_id_;
}
Operation::Value { class, op } => {
let key =
class.serialize(account_id, collection, document_id, 0, (&result).into());
@ -299,7 +305,7 @@ impl PostgresStore {
let key = LogKey {
account_id,
collection,
change_id: batch.change_id,
change_id,
}
.serialize(0);

View file

@ -173,6 +173,7 @@ impl<'x> RocksDBTransaction<'x> {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let mut change_id = u64::MAX;
let mut result = AssignedIds::default();
let txn = self
@ -196,6 +197,11 @@ impl<'x> RocksDBTransaction<'x> {
} => {
document_id = *document_id_;
}
Operation::ChangeId {
change_id: change_id_,
} => {
change_id = *change_id_;
}
Operation::Value { class, op } => {
let key =
class.serialize(account_id, collection, document_id, 0, (&result).into());
@ -297,7 +303,7 @@ impl<'x> RocksDBTransaction<'x> {
let key = LogKey {
account_id,
collection,
change_id: self.batch.change_id,
change_id,
}
.serialize(0);

View file

@ -41,6 +41,7 @@ impl SqliteStore {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let mut change_id = u64::MAX;
let trx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
let mut result = AssignedIds::default();
@ -61,6 +62,11 @@ impl SqliteStore {
} => {
document_id = *document_id_;
}
Operation::ChangeId {
change_id: change_id_,
} => {
change_id = *change_id_;
}
Operation::Value { class, op } => {
let key = class.serialize(
account_id,
@ -196,7 +202,7 @@ impl SqliteStore {
let key = LogKey {
account_id,
collection,
change_id: batch.change_id,
change_id,
}
.serialize(0);

View file

@ -31,12 +31,11 @@ impl BatchBuilder {
pub fn new() -> Self {
Self {
ops: Vec::with_capacity(16),
change_id: u64::MAX,
}
}
pub fn with_change_id(&mut self, change_id: u64) -> &mut Self {
self.change_id = change_id;
self.ops.push(Operation::ChangeId { change_id });
self
}
@ -208,16 +207,12 @@ impl BatchBuilder {
}
pub fn build(self) -> Batch {
Batch {
ops: self.ops,
change_id: self.change_id,
}
Batch { ops: self.ops }
}
pub fn build_batch(&mut self) -> Batch {
Batch {
ops: std::mem::take(&mut self.ops),
change_id: self.change_id,
}
}

View file

@ -449,18 +449,9 @@ impl<T: ResolveId> BitmapClass<T> {
KeySerializer::new((U32_LEN * 2) + 3)
}
.write(account_id)
.write(collection | BM_MARKER)
.write(*field)
.write_leb128(id.resolve_id(assigned_ids)),
TagValue::Static(id) => if (flags & WITH_SUBSPACE) != 0 {
KeySerializer::new(U32_LEN + 5).write(SUBSPACE_BITMAP_TAG)
} else {
KeySerializer::new(U32_LEN + 4)
}
.write(account_id)
.write(collection)
.write(*field)
.write(*id),
.write_leb128(id.resolve_id(assigned_ids)),
TagValue::Text(text) => if (flags & WITH_SUBSPACE) != 0 {
KeySerializer::new(U32_LEN + 4 + text.len()).write(SUBSPACE_BITMAP_TAG)
} else {
@ -468,7 +459,7 @@ impl<T: ResolveId> BitmapClass<T> {
}
.write(account_id)
.write(collection)
.write(*field)
.write(*field | BM_MARKER)
.write(text.as_slice()),
},
BitmapClass::Text { field, token } => {

View file

@ -144,7 +144,7 @@ impl ChangeLogBuilder {
impl IntoOperations for ChangeLogBuilder {
fn build(self, batch: &mut super::BatchBuilder) {
batch.change_id = self.change_id;
batch.with_change_id(self.change_id);
for (collection, changes) in self.changes {
batch.ops.push(Operation::Collection { collection });
batch.ops.push(Operation::Log {

View file

@ -97,13 +97,11 @@ pub const F_CLEAR: u32 = 1 << 3;
#[derive(Debug)]
pub struct Batch {
pub ops: Vec<Operation>,
pub change_id: u64,
}
#[derive(Debug)]
pub struct BatchBuilder {
pub ops: Vec<Operation>,
pub change_id: u64,
}
#[derive(Debug, PartialEq, Eq, Hash)]
@ -117,6 +115,9 @@ pub enum Operation {
DocumentId {
document_id: u32,
},
ChangeId {
change_id: u64,
},
AssertValue {
class: ValueClass<MaybeDynamicId>,
assert_value: AssertValue,
@ -156,7 +157,6 @@ pub struct BitmapHash {
pub enum TagValue<T> {
Id(T),
Text(Vec<u8>),
Static(u8),
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
@ -275,9 +275,15 @@ impl<T> From<String> for TagValue<T> {
}
}
impl<T> From<u8> for TagValue<T> {
impl From<u8> for TagValue<u32> {
fn from(value: u8) -> Self {
TagValue::Static(value)
TagValue::Id(value as u32)
}
}
impl From<u8> for TagValue<MaybeDynamicId> {
fn from(value: u8) -> Self {
TagValue::Id(MaybeDynamicId::Static(value as u32))
}
}

View file

@ -28,10 +28,9 @@ use store::{
rand,
write::{
AnyKey, BatchBuilder, BitmapClass, BitmapHash, BlobOp, DirectoryClass, LookupClass,
Operation, QueueClass, QueueEvent, TagValue, ValueClass,
MaybeDynamicId, MaybeDynamicValue, Operation, QueueClass, QueueEvent, TagValue, ValueClass,
},
IterateParams, Store, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES,
SUBSPACE_LOGS, SUBSPACE_VALUES,
*,
};
use utils::BlobHash;
@ -75,7 +74,7 @@ pub async fn test(db: Store) {
batch.with_collection(collection);
for document_id in [0, 10, 20, 30, 40] {
batch.create_document(document_id);
batch.create_document_with_id(document_id);
if collection == u8::from(Collection::Mailbox) {
batch
@ -113,25 +112,23 @@ pub async fn test(db: Store) {
);
}
batch.ops.push(Operation::Log {
batch.ops.push(Operation::ChangeId {
change_id: document_id as u64 + account_id as u64 + collection as u64,
});
batch.ops.push(Operation::Log {
set: MaybeDynamicValue::Static(vec![
account_id as u8,
collection,
set: vec![account_id as u8, collection, document_id as u8],
document_id as u8,
]),
});
for field in 0..5 {
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Tag {
field,
value: TagValue::Id(rand::random()),
},
set: true,
});
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Tag {
field,
value: TagValue::Static(rand::random()),
value: TagValue::Id(MaybeDynamicId::Static(rand::random())),
},
set: true,
});
@ -203,7 +200,7 @@ pub async fn test(db: Store) {
for account_id in [1, 2, 3, 4, 5] {
batch
.create_document(account_id)
.create_document_with_id(account_id)
.add(
ValueClass::Directory(DirectoryClass::UsedQuota(account_id)),
rand::random(),
@ -227,20 +224,22 @@ pub async fn test(db: Store) {
random_bytes(4),
)
.set(
ValueClass::Directory(DirectoryClass::Principal(account_id)),
ValueClass::Directory(DirectoryClass::Principal(MaybeDynamicId::Static(
account_id,
))),
random_bytes(30),
)
.set(
ValueClass::Directory(DirectoryClass::MemberOf {
principal_id: account_id,
member_of: rand::random(),
principal_id: MaybeDynamicId::Static(account_id),
member_of: MaybeDynamicId::Static(rand::random()),
}),
random_bytes(15),
)
.set(
ValueClass::Directory(DirectoryClass::Members {
principal_id: account_id,
has_member: rand::random(),
principal_id: MaybeDynamicId::Static(account_id),
has_member: MaybeDynamicId::Static(rand::random()),
}),
random_bytes(15),
);
@ -290,14 +289,6 @@ struct KeyValue {
impl Snapshot {
async fn new(db: &Store) -> Self {
#[cfg(feature = "rocks")]
let is_rocks = matches!(db, Store::RocksDb(_));
#[cfg(not(feature = "rocks"))]
let is_rocks = false;
#[cfg(feature = "foundationdb")]
let is_fdb = matches!(db, Store::FoundationDb(_));
#[cfg(not(feature = "foundationdb"))]
let is_fdb = false;
let is_sql = matches!(
db,
Store::SQLite(_) | Store::PostgreSQL(_) | Store::MySQL(_)
@ -306,12 +297,27 @@ impl Snapshot {
let mut keys = AHashSet::new();
for (subspace, with_values) in [
(SUBSPACE_VALUES, true),
(SUBSPACE_COUNTERS, !is_sql),
(SUBSPACE_ACL, true),
(SUBSPACE_BITMAP_ID, false),
(SUBSPACE_BITMAP_TAG, false),
(SUBSPACE_BITMAP_TEXT, false),
(SUBSPACE_DIRECTORY, true),
(SUBSPACE_FTS_INDEX, true),
(SUBSPACE_INDEXES, false),
(SUBSPACE_BLOB_RESERVE, true),
(SUBSPACE_BLOB_LINK, true),
(SUBSPACE_BLOBS, true),
(SUBSPACE_LOGS, true),
(SUBSPACE_BITMAPS, is_rocks | is_fdb),
(SUBSPACE_INDEXES, false),
(SUBSPACE_COUNTER, !is_sql),
(SUBSPACE_LOOKUP_VALUE, true),
(SUBSPACE_PROPERTY, true),
(SUBSPACE_SETTINGS, true),
(SUBSPACE_QUEUE_MESSAGE, true),
(SUBSPACE_QUEUE_EVENT, true),
(SUBSPACE_QUOTA, !is_sql),
(SUBSPACE_REPORT_OUT, true),
(SUBSPACE_REPORT_IN, true),
(SUBSPACE_TERM_INDEX, true),
] {
let from_key = AnyKey {
subspace,