Restructure communication from BackgroundScanner to LocalWorktree

The worktree no longer pulls the background snapshot from the background scanner.
Instead, the background scanner sends both snapshots to the worktree. Along with
these, it sends the path change sets.

Also, add randomized test coverage for the worktree UpdatedEntries events.
This commit is contained in:
Max Brunsfeld 2023-03-20 18:22:23 -07:00
parent cbeb6e692d
commit d742c758bc

View file

@ -64,10 +64,8 @@ pub enum Worktree {
pub struct LocalWorktree {
snapshot: LocalSnapshot,
background_snapshot: Arc<Mutex<LocalSnapshot>>,
background_changes: Arc<Mutex<HashMap<Arc<Path>, PathChange>>>,
last_scan_state_rx: watch::Receiver<ScanState>,
is_scanning: (watch::Sender<bool>, watch::Receiver<bool>),
_background_scanner_task: Task<()>,
poll_task: Option<Task<()>>,
share: Option<ShareState>,
diagnostics: HashMap<Arc<Path>, Vec<DiagnosticEntry<Unclipped<PointUtf16>>>>,
diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
@ -123,6 +121,7 @@ impl std::fmt::Debug for GitRepositoryEntry {
}
}
#[derive(Debug)]
pub struct LocalSnapshot {
ignores_by_parent_abs_path: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
git_repositories: Vec<GitRepositoryEntry>,
@ -159,11 +158,12 @@ impl DerefMut for LocalSnapshot {
#[derive(Clone, Debug)]
enum ScanState {
Idle,
/// The worktree is performing its initial scan of the filesystem.
Initializing,
Initializing(LocalSnapshot),
Initialized(LocalSnapshot),
/// The worktree is updating in response to filesystem events.
Updating,
Updated(LocalSnapshot, HashMap<Arc<Path>, PathChange>),
Err(Arc<anyhow::Error>),
}
@ -235,49 +235,37 @@ impl Worktree {
}
let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded();
let (mut last_scan_state_tx, last_scan_state_rx) =
watch::channel_with(ScanState::Initializing);
let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
let background_changes = Arc::new(Mutex::new(HashMap::default()));
cx.spawn_weak(|this, mut cx| async move {
while let Some(scan_state) = scan_states_rx.next().await {
if let Some(this) = this.upgrade(&cx) {
last_scan_state_tx.blocking_send(scan_state).ok();
this.update(&mut cx, |this, cx| {
this.as_local_mut().unwrap().poll_snapshot(false, cx)
});
} else {
break;
}
while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) {
this.update(&mut cx, |this, cx| {
this.as_local_mut()
.unwrap()
.background_scanner_updated(state, cx);
});
}
})
.detach();
let background_scanner_task = cx.background().spawn({
let fs = fs.clone();
let background_snapshot = background_snapshot.clone();
let background = cx.background().clone();
async move {
let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background)
.run(events)
.await;
}
});
Worktree::Local(LocalWorktree {
_background_scanner_task: cx.background().spawn({
let fs = fs.clone();
let background_snapshot = background_snapshot.clone();
let background_changes = background_changes.clone();
let background = cx.background().clone();
async move {
let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
let scanner = BackgroundScanner::new(
background_snapshot,
background_changes,
scan_states_tx,
fs,
background,
);
scanner.run(events).await;
}
}),
snapshot,
background_snapshot,
background_changes,
last_scan_state_rx,
is_scanning: watch::channel_with(true),
share: None,
poll_task: None,
_background_scanner_task: background_scanner_task,
diagnostics: Default::default(),
diagnostic_summaries: Default::default(),
client,
@ -335,7 +323,9 @@ impl Worktree {
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
let this = this.as_remote_mut().unwrap();
this.poll_snapshot(cx);
this.snapshot = this.background_snapshot.lock().clone();
cx.emit(Event::UpdatedEntries(Default::default()));
cx.notify();
while let Some((scan_id, _)) = this.snapshot_subscriptions.front() {
if this.observed_snapshot(*scan_id) {
let (_, tx) = this.snapshot_subscriptions.pop_front().unwrap();
@ -539,66 +529,49 @@ impl LocalWorktree {
Ok(updated)
}
fn poll_snapshot(&mut self, force: bool, cx: &mut ModelContext<Worktree>) {
self.poll_task.take();
match self.last_scan_state_rx.borrow().clone() {
ScanState::Idle => {
let new_snapshot = self.background_snapshot.lock().clone();
let changes = mem::take(&mut *self.background_changes.lock());
let updated_repos = Self::changed_repos(
&self.snapshot.git_repositories,
&new_snapshot.git_repositories,
);
self.snapshot = new_snapshot;
if let Some(share) = self.share.as_mut() {
*share.snapshots_tx.borrow_mut() = self.snapshot.clone();
}
fn background_scanner_updated(
&mut self,
scan_state: ScanState,
cx: &mut ModelContext<Worktree>,
) {
match scan_state {
ScanState::Initializing(new_snapshot) => {
*self.is_scanning.0.borrow_mut() = true;
self.set_snapshot(new_snapshot, cx);
}
ScanState::Initialized(new_snapshot) => {
*self.is_scanning.0.borrow_mut() = false;
self.set_snapshot(new_snapshot, cx);
}
ScanState::Updating => {
*self.is_scanning.0.borrow_mut() = true;
}
ScanState::Updated(new_snapshot, changes) => {
*self.is_scanning.0.borrow_mut() = false;
cx.emit(Event::UpdatedEntries(changes));
if !updated_repos.is_empty() {
cx.emit(Event::UpdatedGitRepositories(updated_repos));
}
self.set_snapshot(new_snapshot, cx);
}
ScanState::Initializing => {
let is_fake_fs = self.fs.is_fake();
let new_snapshot = self.background_snapshot.lock().clone();
let updated_repos = Self::changed_repos(
&self.snapshot.git_repositories,
&new_snapshot.git_repositories,
);
self.snapshot = new_snapshot;
self.poll_task = Some(cx.spawn_weak(|this, mut cx| async move {
if is_fake_fs {
#[cfg(any(test, feature = "test-support"))]
cx.background().simulate_random_delay().await;
} else {
smol::Timer::after(Duration::from_millis(100)).await;
}
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
this.as_local_mut().unwrap().poll_snapshot(false, cx)
});
}
}));
if !updated_repos.is_empty() {
cx.emit(Event::UpdatedGitRepositories(updated_repos));
}
}
_ => {
if force {
self.snapshot = self.background_snapshot.lock().clone();
}
ScanState::Err(error) => {
*self.is_scanning.0.borrow_mut() = false;
log::error!("error scanning worktree {:?}", error);
}
}
}
fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext<Worktree>) {
let updated_repos = Self::changed_repos(
&self.snapshot.git_repositories,
&new_snapshot.git_repositories,
);
self.snapshot = new_snapshot;
if let Some(share) = self.share.as_mut() {
*share.snapshots_tx.borrow_mut() = self.snapshot.clone();
}
if !updated_repos.is_empty() {
cx.emit(Event::UpdatedGitRepositories(updated_repos));
}
cx.notify();
}
@ -631,11 +604,15 @@ impl LocalWorktree {
}
pub fn scan_complete(&self) -> impl Future<Output = ()> {
let mut scan_state_rx = self.last_scan_state_rx.clone();
let mut is_scanning_rx = self.is_scanning.1.clone();
async move {
let mut scan_state = Some(scan_state_rx.borrow().clone());
while let Some(ScanState::Initializing | ScanState::Updating) = scan_state {
scan_state = scan_state_rx.recv().await;
let mut is_scanning = is_scanning_rx.borrow().clone();
while is_scanning {
if let Some(value) = is_scanning_rx.recv().await {
is_scanning = value;
} else {
break;
}
}
}
}
@ -827,11 +804,14 @@ impl LocalWorktree {
delete.await?;
this.update(&mut cx, |this, cx| {
let this = this.as_local_mut().unwrap();
{
let mut snapshot = this.background_snapshot.lock();
snapshot.delete_entry(entry_id);
this.background_snapshot.lock().delete_entry(entry_id);
if let Some(path) = this.snapshot.delete_entry(entry_id) {
cx.emit(Event::UpdatedEntries(
[(path, PathChange::Removed)].into_iter().collect(),
));
}
this.poll_snapshot(true, cx);
});
Ok(())
}))
@ -953,13 +933,8 @@ impl LocalWorktree {
cx: &mut ModelContext<Worktree>,
) -> Task<Result<Entry>> {
let fs = self.fs.clone();
let root_char_bag;
let next_entry_id;
{
let snapshot = self.background_snapshot.lock();
root_char_bag = snapshot.root_char_bag;
next_entry_id = snapshot.next_entry_id.clone();
}
let root_char_bag = self.snapshot.root_char_bag;
let next_entry_id = self.snapshot.next_entry_id.clone();
cx.spawn_weak(|this, mut cx| async move {
let metadata = fs
.metadata(&abs_path)
@ -970,32 +945,43 @@ impl LocalWorktree {
.ok_or_else(|| anyhow!("worktree was dropped"))?;
this.update(&mut cx, |this, cx| {
let this = this.as_local_mut().unwrap();
let inserted_entry;
let mut entry = Entry::new(path, &metadata, &next_entry_id, root_char_bag);
entry.is_ignored = this
.snapshot
.ignore_stack_for_abs_path(&abs_path, entry.is_dir())
.is_abs_path_ignored(&abs_path, entry.is_dir());
{
let mut snapshot = this.background_snapshot.lock();
let mut changes = this.background_changes.lock();
let mut entry = Entry::new(path, &metadata, &next_entry_id, root_char_bag);
entry.is_ignored = snapshot
.ignore_stack_for_abs_path(&abs_path, entry.is_dir())
.is_abs_path_ignored(&abs_path, entry.is_dir());
if let Some(old_path) = old_path {
snapshot.remove_path(&old_path);
changes.insert(old_path.clone(), PathChange::Removed);
}
snapshot.scan_started();
let exists = snapshot.entry_for_path(&entry.path).is_some();
inserted_entry = snapshot.insert_entry(entry, fs.as_ref());
changes.insert(
inserted_entry.path.clone(),
if exists {
PathChange::Updated
} else {
PathChange::Added
},
);
if let Some(old_path) = &old_path {
snapshot.remove_path(old_path);
}
snapshot.insert_entry(entry.clone(), fs.as_ref());
snapshot.scan_completed();
}
this.poll_snapshot(true, cx);
let mut changes = HashMap::default();
this.snapshot.scan_started();
if let Some(old_path) = &old_path {
this.snapshot.remove_path(old_path);
changes.insert(old_path.clone(), PathChange::Removed);
}
let exists = this.snapshot.entry_for_path(&entry.path).is_some();
let inserted_entry = this.snapshot.insert_entry(entry, fs.as_ref());
changes.insert(
inserted_entry.path.clone(),
if exists {
PathChange::Updated
} else {
PathChange::Added
},
);
this.snapshot.scan_completed();
eprintln!("refreshed {:?}", changes);
cx.emit(Event::UpdatedEntries(changes));
Ok(inserted_entry)
})
})
@ -1099,12 +1085,6 @@ impl RemoteWorktree {
self.snapshot.clone()
}
fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
self.snapshot = self.background_snapshot.lock().clone();
cx.emit(Event::UpdatedEntries(Default::default()));
cx.notify();
}
pub fn disconnected_from_host(&mut self) {
self.updates_tx.take();
self.snapshot_subscriptions.clear();
@ -1264,28 +1244,25 @@ impl Snapshot {
Ok(entry)
}
fn delete_entry(&mut self, entry_id: ProjectEntryId) -> bool {
if let Some(removed_entry) = self.entries_by_id.remove(&entry_id, &()) {
self.entries_by_path = {
let mut cursor = self.entries_by_path.cursor();
let mut new_entries_by_path =
cursor.slice(&TraversalTarget::Path(&removed_entry.path), Bias::Left, &());
while let Some(entry) = cursor.item() {
if entry.path.starts_with(&removed_entry.path) {
self.entries_by_id.remove(&entry.id, &());
cursor.next(&());
} else {
break;
}
fn delete_entry(&mut self, entry_id: ProjectEntryId) -> Option<Arc<Path>> {
let removed_entry = self.entries_by_id.remove(&entry_id, &())?;
self.entries_by_path = {
let mut cursor = self.entries_by_path.cursor();
let mut new_entries_by_path =
cursor.slice(&TraversalTarget::Path(&removed_entry.path), Bias::Left, &());
while let Some(entry) = cursor.item() {
if entry.path.starts_with(&removed_entry.path) {
self.entries_by_id.remove(&entry.id, &());
cursor.next(&());
} else {
break;
}
new_entries_by_path.push_tree(cursor.suffix(&()), &());
new_entries_by_path
};
}
new_entries_by_path.push_tree(cursor.suffix(&()), &());
new_entries_by_path
};
true
} else {
false
}
Some(removed_entry.path)
}
pub(crate) fn apply_remote_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
@ -2204,7 +2181,7 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
struct BackgroundScanner {
fs: Arc<dyn Fs>,
snapshot: Arc<Mutex<LocalSnapshot>>,
changes: Arc<Mutex<HashMap<Arc<Path>, PathChange>>>,
changes: HashMap<Arc<Path>, PathChange>,
notify: UnboundedSender<ScanState>,
executor: Arc<executor::Background>,
}
@ -2212,7 +2189,6 @@ struct BackgroundScanner {
impl BackgroundScanner {
fn new(
snapshot: Arc<Mutex<LocalSnapshot>>,
changes: Arc<Mutex<HashMap<Arc<Path>, PathChange>>>,
notify: UnboundedSender<ScanState>,
fs: Arc<dyn Fs>,
executor: Arc<executor::Background>,
@ -2220,9 +2196,9 @@ impl BackgroundScanner {
Self {
fs,
snapshot,
changes,
notify,
executor,
changes: Default::default(),
}
}
@ -2231,10 +2207,34 @@ impl BackgroundScanner {
}
async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
if self.notify.unbounded_send(ScanState::Initializing).is_err() {
return;
}
// While performing the initial scan, send a new snapshot to the main
// thread on a recurring interval.
let initializing_task = self.executor.spawn({
let executor = self.executor.clone();
let snapshot = self.snapshot.clone();
let notify = self.notify.clone();
let is_fake_fs = self.fs.is_fake();
async move {
loop {
if is_fake_fs {
#[cfg(any(test, feature = "test-support"))]
executor.simulate_random_delay().await;
} else {
smol::Timer::after(Duration::from_millis(100)).await;
}
executor.timer(Duration::from_millis(100)).await;
if notify
.unbounded_send(ScanState::Initializing(snapshot.lock().clone()))
.is_err()
{
break;
}
}
}
});
// Scan the entire directory.
if let Err(err) = self.scan_dirs().await {
if self
.notify
@ -2245,7 +2245,13 @@ impl BackgroundScanner {
}
}
if self.notify.unbounded_send(ScanState::Idle).is_err() {
drop(initializing_task);
if self
.notify
.unbounded_send(ScanState::Initialized(self.snapshot.lock().clone()))
.is_err()
{
return;
}
@ -2264,7 +2270,14 @@ impl BackgroundScanner {
if !self.process_events(events, true).await {
return;
}
if self.notify.unbounded_send(ScanState::Idle).is_err() {
if self
.notify
.unbounded_send(ScanState::Updated(
self.snapshot.lock().clone(),
mem::take(&mut self.changes),
))
.is_err()
{
return;
}
}
@ -2280,7 +2293,14 @@ impl BackgroundScanner {
if !self.process_events(events, false).await {
return;
}
if self.notify.unbounded_send(ScanState::Idle).is_err() {
if self
.notify
.unbounded_send(ScanState::Updated(
self.snapshot.lock().clone(),
mem::take(&mut self.changes),
))
.is_err()
{
return;
}
}
@ -2737,7 +2757,7 @@ impl BackgroundScanner {
}
fn build_change_set(
&self,
&mut self,
old_snapshot: Snapshot,
event_paths: Vec<Arc<Path>>,
received_before_initialized: bool,
@ -2746,7 +2766,8 @@ impl BackgroundScanner {
let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>();
let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>();
let mut change_set = self.changes.lock();
use PathChange::{Added, AddedOrUpdated, Removed, Updated};
for path in event_paths {
let path = PathKey(path);
old_paths.seek(&path, Bias::Left, &());
@ -2765,7 +2786,7 @@ impl BackgroundScanner {
match Ord::cmp(&old_entry.path, &new_entry.path) {
Ordering::Less => {
change_set.insert(old_entry.path.clone(), PathChange::Removed);
self.changes.insert(old_entry.path.clone(), Removed);
old_paths.next(&());
}
Ordering::Equal => {
@ -2773,26 +2794,25 @@ impl BackgroundScanner {
// If the worktree was not fully initialized when this event was generated,
// we can't know whether this entry was added during the scan or whether
// it was merely updated.
change_set
.insert(old_entry.path.clone(), PathChange::AddedOrUpdated);
self.changes.insert(old_entry.path.clone(), AddedOrUpdated);
} else if old_entry.mtime != new_entry.mtime {
change_set.insert(old_entry.path.clone(), PathChange::Updated);
self.changes.insert(old_entry.path.clone(), Updated);
}
old_paths.next(&());
new_paths.next(&());
}
Ordering::Greater => {
change_set.insert(new_entry.path.clone(), PathChange::Added);
self.changes.insert(new_entry.path.clone(), Added);
new_paths.next(&());
}
}
}
(Some(old_entry), None) => {
change_set.insert(old_entry.path.clone(), PathChange::Removed);
self.changes.insert(old_entry.path.clone(), Removed);
old_paths.next(&());
}
(None, Some(new_entry)) => {
change_set.insert(new_entry.path.clone(), PathChange::Added);
self.changes.insert(new_entry.path.clone(), Added);
new_paths.next(&());
}
(None, None) => break,
@ -3567,6 +3587,49 @@ mod tests {
.update(cx, |tree, _| tree.as_local_mut().unwrap().scan_complete())
.await;
// After the initial scan is complete, the `UpdatedEntries` event can
// be used to follow along with all changes to the worktree's snapshot.
worktree.update(cx, |tree, cx| {
let mut paths = tree
.as_local()
.unwrap()
.paths()
.cloned()
.collect::<Vec<_>>();
cx.subscribe(&worktree, move |tree, _, event, _| {
if let Event::UpdatedEntries(changes) = event {
for (path, change_type) in changes.iter() {
let path = path.clone();
let ix = match paths.binary_search(&path) {
Ok(ix) | Err(ix) => ix,
};
match change_type {
PathChange::Added => {
assert_ne!(paths.get(ix), Some(&path));
paths.insert(ix, path);
}
PathChange::Removed => {
assert_eq!(paths.get(ix), Some(&path));
paths.remove(ix);
}
PathChange::Updated => {
assert_eq!(paths.get(ix), Some(&path));
}
PathChange::AddedOrUpdated => {
if paths[ix] != path {
paths.insert(ix, path);
}
}
}
}
let new_paths = tree.paths().cloned().collect::<Vec<_>>();
assert_eq!(paths, new_paths, "incorrect changes: {:?}", changes);
}
})
.detach();
});
let mut snapshots = Vec::new();
let mut mutations_len = operations;
while mutations_len > 1 {