feat: compare frontiers causal order (#257)

This commit is contained in:
Zixuan Chen 2024-01-22 12:03:50 +08:00 committed by GitHub
parent 0998342001
commit 680041f3a9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 189 additions and 29 deletions

View file

@ -24,6 +24,7 @@ use crate::{
handler::TextHandler,
handler::TreeHandler,
id::PeerID,
oplog::dag::FrontiersNotIncluded,
version::Frontiers,
InternalString, LoroError, VersionVector,
};
@ -574,8 +575,20 @@ impl LoroDoc {
/// - Ordering::Equal means versions equal
/// - Ordering::Greater means self's version is greater than target
#[inline]
pub fn cmp_frontiers(&self, other: &Frontiers) -> Ordering {
self.oplog().lock().unwrap().cmp_frontiers(other)
pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
self.oplog().lock().unwrap().cmp_with_frontiers(other)
}
/// Compare two [Frontiers] causally.
///
/// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
#[inline]
pub fn cmp_frontiers(
&self,
a: &Frontiers,
b: &Frontiers,
) -> Result<Option<Ordering>, FrontiersNotIncluded> {
self.oplog().lock().unwrap().cmp_frontiers(a, b)
}
pub fn subscribe_root(&self, callback: Subscriber) -> SubID {
@ -721,11 +734,13 @@ impl LoroDoc {
Arc::downgrade(&self.state)
}
#[inline]
pub fn len_ops(&self) -> usize {
let oplog = self.oplog.lock().unwrap();
oplog.vv().iter().map(|(_, ops)| *ops).sum::<i32>() as usize
}
#[inline]
pub fn len_changes(&self) -> usize {
let oplog = self.oplog.lock().unwrap();
oplog.len_changes()
@ -750,6 +765,7 @@ impl LoroDoc {
self.renew_txn_if_auto_commit();
}
#[inline]
pub fn log_estimated_size(&self) {
let state = self.state.try_lock().unwrap();
state.log_estimated_size();

View file

@ -26,6 +26,7 @@ use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
type ClientChanges = FxHashMap<PeerID, Vec<Change>>;
pub use self::dag::FrontiersNotIncluded;
use self::pending_changes::PendingChanges;
use super::arena::SharedArena;
@ -432,8 +433,20 @@ impl OpLog {
/// - Ordering::Less means self is less than target or parallel
/// - Ordering::Equal means versions equal
/// - Ordering::Greater means self's version is greater than target
pub fn cmp_frontiers(&self, other: &Frontiers) -> Ordering {
self.dag.cmp_frontiers(other)
pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
self.dag.cmp_with_frontiers(other)
}
/// Compare two [Frontiers] causally.
///
/// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
#[inline]
pub fn cmp_frontiers(
&self,
a: &Frontiers,
b: &Frontiers,
) -> Result<Option<Ordering>, FrontiersNotIncluded> {
self.dag.cmp_frontiers(a, b)
}
pub(crate) fn get_min_lamport_at(&self, id: ID) -> Lamport {

View file

@ -1,4 +1,5 @@
use std::cmp::Ordering;
use std::fmt::{Display, Write};
use crate::change::Lamport;
use crate::dag::{Dag, DagNode};
@ -251,7 +252,7 @@ impl AppDag {
/// - Ordering::Less means self is less than target or parallel
/// - Ordering::Equal means versions equal
/// - Ordering::Greater means self's version is greater than target
pub fn cmp_frontiers(&self, other: &Frontiers) -> Ordering {
pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
if &self.frontiers == other {
Ordering::Equal
} else if other.iter().all(|id| self.vv.includes_id(*id)) {
@ -260,4 +261,26 @@ impl AppDag {
Ordering::Less
}
}
// PERF
/// Compare two [Frontiers] causally.
///
/// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
pub fn cmp_frontiers(
&self,
a: &Frontiers,
b: &Frontiers,
) -> Result<Option<Ordering>, FrontiersNotIncluded> {
let a = self.frontiers_to_vv(a).ok_or(FrontiersNotIncluded)?;
let b = self.frontiers_to_vv(b).ok_or(FrontiersNotIncluded)?;
Ok(a.partial_cmp(&b))
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct FrontiersNotIncluded;
impl Display for FrontiersNotIncluded {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("The given Frontiers are not included by the doc")
}
}

View file

@ -167,6 +167,12 @@ impl From<&[ID]> for Frontiers {
}
}
impl<const N: usize> From<[ID; N]> for Frontiers {
fn from(value: [ID; N]) -> Self {
Self(value.as_slice().into())
}
}
impl From<ID> for Frontiers {
fn from(value: ID) -> Self {
Self([value].into())

View file

@ -99,6 +99,8 @@ extern "C" {
pub type JsTextStyles;
#[wasm_bindgen(typescript_type = "Delta<string>[]")]
pub type JsDelta;
#[wasm_bindgen(typescript_type = "-1 | 1 | 0 | undefined")]
pub type JsPartialOrd;
}
mod observer {
@ -625,17 +627,47 @@ impl Loro {
///
/// Frontiers cannot be compared without the history of the OpLog.
///
#[inline]
#[wasm_bindgen(js_name = "cmpFrontiers")]
pub fn cmp_frontiers(&self, frontiers: Vec<JsID>) -> JsResult<i32> {
#[wasm_bindgen(js_name = "cmpWithFrontiers")]
pub fn cmp_with_frontiers(&self, frontiers: Vec<JsID>) -> JsResult<i32> {
let frontiers = ids_to_frontiers(frontiers)?;
Ok(match self.0.cmp_frontiers(&frontiers) {
Ok(match self.0.cmp_with_frontiers(&frontiers) {
Ordering::Less => -1,
Ordering::Greater => 1,
Ordering::Equal => 0,
})
}
/// Compare the ordering of two Frontiers.
///
/// It's assumed that both Frontiers are included by the doc. Otherwise, an error will be thrown.
///
/// Return value:
///
/// - -1: a < b
/// - 0: a == b
/// - 1: a > b
/// - undefined: a ∥ b: a and b are concurrent
#[wasm_bindgen(js_name = "cmpFrontiers")]
pub fn cmp_frontiers(&self, a: Vec<JsID>, b: Vec<JsID>) -> JsResult<JsPartialOrd> {
let a = ids_to_frontiers(a)?;
let b = ids_to_frontiers(b)?;
let c = self
.0
.cmp_frontiers(&a, &b)
.map_err(|e| JsError::new(&e.to_string()))?;
if let Some(c) = c {
let v: JsValue = match c {
Ordering::Less => -1,
Ordering::Greater => 1,
Ordering::Equal => 0,
}
.into();
Ok(v.into())
} else {
Ok(JsValue::UNDEFINED.into())
}
}
/// Export the snapshot of current version, it's include all content of
/// operations and states
#[wasm_bindgen(js_name = "exportSnapshot")]

View file

@ -21,6 +21,7 @@ pub use loro_internal::container::richtext::ExpandType;
pub use loro_internal::container::{ContainerID, ContainerType};
pub use loro_internal::obs::SubID;
pub use loro_internal::obs::Subscriber;
pub use loro_internal::oplog::FrontiersNotIncluded;
pub use loro_internal::version::{Frontiers, VersionVector};
pub use loro_internal::DiffEvent;
pub use loro_internal::{loro_value, to_value};
@ -78,8 +79,16 @@ impl LoroDoc {
self.doc.checkout(frontiers)
}
pub fn cmp_frontiers(&self, other: &Frontiers) -> Ordering {
self.doc.cmp_frontiers(other)
pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
self.doc.cmp_with_frontiers(other)
}
pub fn cmp_frontiers(
&self,
a: &Frontiers,
b: &Frontiers,
) -> Result<Option<Ordering>, FrontiersNotIncluded> {
self.doc.cmp_frontiers(a, b)
}
/// Force the document enter the detached mode.

View file

@ -1,7 +1,46 @@
use std::sync::Arc;
use std::{cmp::Ordering, sync::Arc};
use loro::LoroDoc;
use loro_internal::{delta::DeltaItem, handler::TextDelta, DiffEvent, LoroResult};
use loro::{FrontiersNotIncluded, LoroDoc};
use loro_internal::{delta::DeltaItem, handler::TextDelta, id::ID, DiffEvent, LoroResult};
#[test]
fn cmp_frontiers() {
let doc1 = LoroDoc::new();
doc1.set_peer_id(1).unwrap();
doc1.get_text("text").insert(0, "012345").unwrap();
let doc2 = LoroDoc::new();
doc2.set_peer_id(2).unwrap();
doc2.import(&doc1.export_snapshot()).unwrap();
doc2.get_text("text").insert(0, "6789").unwrap();
doc1.import(&doc2.export_snapshot()).unwrap();
doc1.get_text("text").insert(0, "0123").unwrap();
doc1.commit();
assert_eq!(
doc1.cmp_frontiers(&[].into(), &[ID::new(2, 5)].into()),
Err(FrontiersNotIncluded)
);
assert_eq!(
doc1.cmp_frontiers(&[ID::new(1, 2)].into(), &[ID::new(2, 3)].into()),
Ok(Some(Ordering::Less))
);
assert_eq!(
doc1.cmp_frontiers(&[ID::new(1, 5)].into(), &[ID::new(2, 3)].into()),
Ok(Some(Ordering::Less))
);
assert_eq!(
doc1.cmp_frontiers(&[ID::new(1, 6)].into(), &[ID::new(2, 3)].into()),
Ok(Some(Ordering::Greater))
);
assert_eq!(
doc1.cmp_frontiers(&[].into(), &[].into()),
Ok(Some(Ordering::Equal))
);
assert_eq!(
doc1.cmp_frontiers(&[ID::new(1, 6)].into(), &[ID::new(1, 6)].into()),
Ok(Some(Ordering::Equal))
);
}
#[test]
fn get_change_at_lamport() {

View file

@ -251,31 +251,31 @@ it("get change with given lamport", () => {
doc1.getText("text").insert(0, "01234");
doc1.commit();
{
const change = doc1.getChangeAtLamport("1", 1);
const change = doc1.getChangeAtLamport("1", 1)!;
expect(change.lamport).toBe(0);
expect(change.peer).toBe("1");
expect(change.length).toBe(5);
}
{
const change = doc1.getChangeAtLamport("1", 7);
const change = doc1.getChangeAtLamport("1", 7)!;
expect(change.lamport).toBe(0);
expect(change.peer).toBe("1");
expect(change.length).toBe(5);
}
{
const change = doc1.getChangeAtLamport("1", 10);
const change = doc1.getChangeAtLamport("1", 10)!;
expect(change.lamport).toBe(10);
expect(change.peer).toBe("1");
expect(change.length).toBe(5);
}
{
const change = doc1.getChangeAtLamport("1", 13);
const change = doc1.getChangeAtLamport("1", 13)!;
expect(change.lamport).toBe(10);
expect(change.peer).toBe("1");
expect(change.length).toBe(5);
}
{
const change = doc1.getChangeAtLamport("1", 20);
const change = doc1.getChangeAtLamport("1", 20)!;
expect(change.lamport).toBe(10);
expect(change.peer).toBe("1");
expect(change.length).toBe(5);

View file

@ -57,17 +57,17 @@ describe("Checkout", () => {
const v0 = doc.frontiers();
const docB = new Loro();
docB.import(doc.exportFrom());
expect(docB.cmpFrontiers(v0)).toBe(0);
expect(docB.cmpWithFrontiers(v0)).toBe(0);
text.insert(1, "0");
doc.commit();
expect(docB.cmpFrontiers(doc.frontiers())).toBe(-1);
expect(docB.cmpWithFrontiers(doc.frontiers())).toBe(-1);
const textB = docB.getText("text");
textB.insert(0, "0");
docB.commit();
expect(docB.cmpFrontiers(doc.frontiers())).toBe(-1);
expect(docB.cmpWithFrontiers(doc.frontiers())).toBe(-1);
docB.import(doc.exportFrom());
expect(docB.cmpFrontiers(doc.frontiers())).toBe(1);
expect(docB.cmpWithFrontiers(doc.frontiers())).toBe(1);
doc.import(docB.exportFrom());
expect(docB.cmpFrontiers(doc.frontiers())).toBe(0);
expect(docB.cmpWithFrontiers(doc.frontiers())).toBe(0);
});
});

View file

@ -4,26 +4,48 @@ import { Loro, OpId, VersionVector } from "../src";
describe("Frontiers", () => {
it("two clients", () => {
const doc = new Loro();
doc.setPeerId(0);
const text = doc.getText("text");
text.insert(0, "0");
doc.commit();
const v0 = doc.frontiers();
const docB = new Loro();
docB.setPeerId(1);
docB.import(doc.exportFrom());
expect(docB.cmpFrontiers(v0)).toBe(0);
expect(docB.cmpWithFrontiers(v0)).toBe(0);
text.insert(1, "0");
doc.commit();
expect(docB.cmpFrontiers(doc.frontiers())).toBe(-1);
expect(docB.cmpWithFrontiers(doc.frontiers())).toBe(-1);
const textB = docB.getText("text");
textB.insert(0, "0");
docB.commit();
expect(docB.cmpFrontiers(doc.frontiers())).toBe(-1);
expect(docB.cmpWithFrontiers(doc.frontiers())).toBe(-1);
docB.import(doc.exportFrom());
expect(docB.cmpFrontiers(doc.frontiers())).toBe(1);
expect(docB.cmpWithFrontiers(doc.frontiers())).toBe(1);
doc.import(docB.exportFrom());
expect(docB.cmpFrontiers(doc.frontiers())).toBe(0);
expect(docB.cmpWithFrontiers(doc.frontiers())).toBe(0);
});
it("cmp frontiers", () => {
const doc1 = new Loro();
doc1.setPeerId(1);
const doc2 = new Loro();
doc2.setPeerId(2n);
doc1.getText("text").insert(0, "01234");
doc2.import(doc1.exportFrom());
doc2.getText("text").insert(0, "56789");
doc1.import(doc2.exportFrom());
doc1.getText("text").insert(0, "01234");
doc1.commit();
expect(() => { doc1.cmpFrontiers([{ peer: "1", counter: 1 }], [{ peer: "2", counter: 10 }]) }).toThrow();
expect(doc1.cmpFrontiers([], [{ peer: "1", counter: 1 }])).toBe(-1)
expect(doc1.cmpFrontiers([], [])).toBe(0)
expect(doc1.cmpFrontiers([{ peer: "1", counter: 4 }], [{ peer: "2", counter: 3 }])).toBe(-1)
expect(doc1.cmpFrontiers([{ peer: "1", counter: 5 }], [{ peer: "2", counter: 3 }])).toBe(1)
})
});
it('peer id repr should be consistent', () => {