chore: save frontiers to kv as well

This commit is contained in:
Zixuan Chen 2024-08-15 08:44:35 +08:00
parent de8f9e38aa
commit 0aa534a9ba
No known key found for this signature in database
2 changed files with 69 additions and 2 deletions

View file

@ -56,6 +56,9 @@ struct ChangeStoreInner {
mem_parsed_kv: BTreeMap<ID, Arc<ChangesBlock>>,
}
pub const VV_KEY: &[u8] = b"vv";
pub const FRONTIERS_KEY: &[u8] = b"fr";
impl ChangeStore {
pub fn new_mem(a: &SharedArena, merge_interval: Arc<AtomicI64>) -> Self {
Self {
@ -229,14 +232,26 @@ impl ChangeStore {
}
loop {
let old_vv_bytes = store.get(b"vv");
let old_vv_bytes = store.get(VV_KEY);
let new_vv = old_vv_bytes
.as_ref()
.map(|bytes| VersionVector::decode(bytes).unwrap())
.unwrap_or_default();
inner.vv.merge_vv(&new_vv);
let vv_bytes = inner.vv.encode();
if store.compare_and_swap(b"vv", old_vv_bytes, vv_bytes.into()) {
if store.compare_and_swap(VV_KEY, old_vv_bytes, vv_bytes.into()) {
} else {
continue;
}
let old_frontiers_bytes = store.get(FRONTIERS_KEY);
let new_frontiers = old_frontiers_bytes
.as_ref()
.map(|bytes| Frontiers::decode(bytes).unwrap())
.unwrap_or_default();
inner.frontiers.merge_frontiers(&new_frontiers);
let frontiers_bytes = inner.frontiers.encode();
if store.compare_and_swap(FRONTIERS_KEY, old_frontiers_bytes, frontiers_bytes.into()) {
break;
}
}

View file

@ -203,6 +203,58 @@ impl Frontiers {
pub(crate) fn with_capacity(cap: usize) -> Frontiers {
Self(SmallVec::with_capacity(cap))
}
pub(crate) fn merge_frontiers(&mut self, new_frontiers: &Frontiers) {
if self.len() <= 1 {
if self == new_frontiers {
return;
}
if new_frontiers.len() == 0 {
return;
}
if self.len() == 0 {
*self = new_frontiers.clone();
return;
}
if new_frontiers.len() == 1 {
let new_id = new_frontiers[0];
if self[0].peer == new_id.peer {
if self[0].counter < new_id.counter {
self[0].counter = new_id.counter;
} else {
return;
}
} else {
self.push(new_id);
return;
}
}
}
let mut map = self
.0
.iter()
.map(|id| (id.peer, id.counter))
.collect::<FxHashMap<_, _>>();
for id in new_frontiers.0.iter() {
if let Some(counter) = map.get_mut(&id.peer) {
if *counter < id.counter {
*counter = id.counter;
}
} else {
self.0.push(*id);
}
}
self.0 = map
.into_iter()
.map(|(peer, counter)| ID::new(peer, counter))
.collect();
}
}
impl Deref for Frontiers {