diff --git a/zed/src/channel.rs b/zed/src/channel.rs index c43cf2e6f7..2eb904915c 100644 --- a/zed/src/channel.rs +++ b/zed/src/channel.rs @@ -190,7 +190,7 @@ impl Channel { rpc: Arc, cx: &mut ModelContext, ) -> Self { - let _subscription = rpc.subscribe_from_model(details.id, cx, Self::handle_message_sent); + let _subscription = rpc.subscribe_to_entity(details.id, cx, Self::handle_message_sent); { let user_store = user_store.clone(); diff --git a/zed/src/rpc.rs b/zed/src/rpc.rs index fe1dde4ffb..6a81bf3c4a 100644 --- a/zed/src/rpc.rs +++ b/zed/src/rpc.rs @@ -230,7 +230,47 @@ impl Client { } } - pub fn subscribe_from_model( + pub fn subscribe(self: &Arc, cx: ModelContext, mut handler: F) -> Subscription + where + T: EnvelopedMessage, + M: Entity, + F: 'static + + Send + + Sync + + FnMut(&mut M, TypedEnvelope, Arc, &mut ModelContext) -> Result<()>, + { + let subscription_id = (TypeId::of::(), Default::default()); + let client = self.clone(); + let mut state = self.state.write(); + let model = cx.handle().downgrade(); + let prev_extractor = state + .entity_id_extractors + .insert(subscription_id.0, Box::new(|_| Default::default())); + if prev_extractor.is_some() { + panic!("registered a handler for the same entity twice") + } + + state.model_handlers.insert( + subscription_id, + Box::new(move |envelope, cx| { + if let Some(model) = model.upgrade(cx) { + let envelope = envelope.into_any().downcast::>().unwrap(); + model.update(cx, |model, cx| { + if let Err(error) = handler(model, *envelope, client.clone(), cx) { + log::error!("error handling message: {}", error) + } + }); + } + }), + ); + + Subscription { + client: Arc::downgrade(self), + id: subscription_id, + } + } + + pub fn subscribe_to_entity( self: &Arc, remote_id: u64, cx: &mut ModelContext, diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 60d453b408..93be506134 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -211,11 +211,11 @@ impl Worktree { } let _subscriptions = vec![ - rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer), - rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer), - rpc.subscribe_from_model(remote_id, cx, Self::handle_update), - rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer), - rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved), + rpc.subscribe_to_entity(remote_id, cx, Self::handle_add_peer), + rpc.subscribe_to_entity(remote_id, cx, Self::handle_remove_peer), + rpc.subscribe_to_entity(remote_id, cx, Self::handle_update), + rpc.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer), + rpc.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved), ]; Worktree::Remote(RemoteWorktree { @@ -1070,12 +1070,12 @@ impl LocalWorktree { this.update(&mut cx, |worktree, cx| { let _subscriptions = vec![ - rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer), - rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer), - rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer), - rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer), - rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer), - rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_peer), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_peer), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer), ]; let worktree = worktree.as_local_mut().unwrap();