From 8dd249a7cd3f814c54ba0250672ddb37929c4250 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 22 Feb 2023 16:04:29 +0100 Subject: [PATCH] Hold room lock through the entirety of a `room_transaction` Previously, when the host repeatedly sent `UpdateWorktree` messages, new guests attempting to join a project would observe a severe slowdown caused by a database serialization error (e.g., the coherence of the data would get violated midway through `Database::join_project` due to worktree entries being mutated as the user joined). Writing entries is pretty fast, whereas reading all of them for a project can take more than 100ms. Transactions that failed due to a serialization error are retried, but the guest would keep retrying until the host finished writing because the guest's read was slow. This commit changes the semantics of `room_transaction` to acquire a room lock before even starting the transaction and holding it all the way after commit (storing it, as before, in the `RoomGuard`). This ensures that a fast writer (the host) can't starve a slow reader (the guest), allowing the latter to make progress by temporarily pausing writes by the former. --- crates/collab/src/db.rs | 198 +++++++++++++++++++++++----------------- 1 file changed, 112 insertions(+), 86 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index af30073ab4..f63d92d9a1 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -157,7 +157,7 @@ impl Database { room_id: RoomId, new_server_id: ServerId, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { let stale_participant_filter = Condition::all() .add(room_participant::Column::RoomId.eq(room_id)) .add(room_participant::Column::AnsweringConnectionId.is_not_null()) @@ -193,14 +193,11 @@ impl Database { room::Entity::delete_by_id(room_id).exec(&*tx).await?; } - Ok(( - room_id, - RefreshedRoom { - room, - stale_participant_user_ids, - canceled_calls_to_user_ids, - }, - )) + Ok(RefreshedRoom { + room, + stale_participant_user_ids, + canceled_calls_to_user_ids, + }) }) .await } @@ -1129,18 +1126,16 @@ impl Database { user_id: UserId, connection: ConnectionId, live_kit_room: &str, - ) -> Result> { - self.room_transaction(|tx| async move { + ) -> Result { + self.transaction(|tx| async move { let room = room::ActiveModel { live_kit_room: ActiveValue::set(live_kit_room.into()), ..Default::default() } .insert(&*tx) .await?; - let room_id = room.id; - room_participant::ActiveModel { - room_id: ActiveValue::set(room_id), + room_id: ActiveValue::set(room.id), user_id: ActiveValue::set(user_id), answering_connection_id: ActiveValue::set(Some(connection.id as i32)), answering_connection_server_id: ActiveValue::set(Some(ServerId( @@ -1157,8 +1152,8 @@ impl Database { .insert(&*tx) .await?; - let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + let room = self.get_room(room.id, &tx).await?; + Ok(room) }) .await } @@ -1171,7 +1166,7 @@ impl Database { called_user_id: UserId, initial_project_id: Option, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { room_participant::ActiveModel { room_id: ActiveValue::set(room_id), user_id: ActiveValue::set(called_user_id), @@ -1190,7 +1185,7 @@ impl Database { let room = self.get_room(room_id, &tx).await?; let incoming_call = Self::build_incoming_call(&room, called_user_id) .ok_or_else(|| anyhow!("failed to build incoming call"))?; - Ok((room_id, (room, incoming_call))) + Ok((room, incoming_call)) }) .await } @@ -1200,7 +1195,7 @@ impl Database { room_id: RoomId, called_user_id: UserId, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { room_participant::Entity::delete_many() .filter( room_participant::Column::RoomId @@ -1210,7 +1205,7 @@ impl Database { .exec(&*tx) .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(room) }) .await } @@ -1257,7 +1252,7 @@ impl Database { calling_connection: ConnectionId, called_user_id: UserId, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { let participant = room_participant::Entity::find() .filter( Condition::all() @@ -1276,14 +1271,13 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("no call to cancel"))?; - let room_id = participant.room_id; room_participant::Entity::delete(participant.into_active_model()) .exec(&*tx) .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(room) }) .await } @@ -1294,7 +1288,7 @@ impl Database { user_id: UserId, connection: ConnectionId, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { let result = room_participant::Entity::update_many() .filter( Condition::all() @@ -1316,7 +1310,7 @@ impl Database { Err(anyhow!("room does not exist or was already joined"))? } else { let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(room) } }) .await @@ -1328,9 +1322,9 @@ impl Database { user_id: UserId, connection: ConnectionId, ) -> Result> { - self.room_transaction(|tx| async { + let room_id = RoomId::from_proto(rejoin_room.id); + self.room_transaction(room_id, |tx| async { let tx = tx; - let room_id = RoomId::from_proto(rejoin_room.id); let participant_update = room_participant::Entity::update_many() .filter( Condition::all() @@ -1549,14 +1543,11 @@ impl Database { } let room = self.get_room(room_id, &tx).await?; - Ok(( - room_id, - RejoinedRoom { - room, - rejoined_projects, - reshared_projects, - }, - )) + Ok(RejoinedRoom { + room, + rejoined_projects, + reshared_projects, + }) }) .await } @@ -1723,7 +1714,7 @@ impl Database { connection: ConnectionId, location: proto::ParticipantLocation, ) -> Result> { - self.room_transaction(|tx| async { + self.room_transaction(room_id, |tx| async { let tx = tx; let location_kind; let location_project_id; @@ -1769,7 +1760,7 @@ impl Database { if result.rows_affected == 1 { let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(room) } else { Err(anyhow!("could not update room participant location"))? } @@ -1963,7 +1954,7 @@ impl Database { connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { let participant = room_participant::Entity::find() .filter( Condition::all() @@ -2024,7 +2015,7 @@ impl Database { .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, (project.id, room))) + Ok((project.id, room)) }) .await } @@ -2034,7 +2025,8 @@ impl Database { project_id: ProjectId, connection: ConnectionId, ) -> Result)>> { - self.room_transaction(|tx| async move { + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; let project = project::Entity::find_by_id(project_id) @@ -2042,12 +2034,11 @@ impl Database { .await? .ok_or_else(|| anyhow!("project not found"))?; if project.host_connection()? == connection { - let room_id = project.room_id; project::Entity::delete(project.into_active_model()) .exec(&*tx) .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, (room, guest_connection_ids))) + Ok((room, guest_connection_ids)) } else { Err(anyhow!("cannot unshare a project hosted by another user"))? } @@ -2061,7 +2052,8 @@ impl Database { connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], ) -> Result)>> { - self.room_transaction(|tx| async move { + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let project = project::Entity::find_by_id(project_id) .filter( Condition::all() @@ -2079,7 +2071,7 @@ impl Database { let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?; let room = self.get_room(project.room_id, &tx).await?; - Ok((project.room_id, (room, guest_connection_ids))) + Ok((room, guest_connection_ids)) }) .await } @@ -2124,12 +2116,12 @@ impl Database { update: &proto::UpdateWorktree, connection: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project_id = ProjectId::from_proto(update.project_id); - let worktree_id = update.worktree_id as i64; - + let project_id = ProjectId::from_proto(update.project_id); + let worktree_id = update.worktree_id as i64; + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { // Ensure the update comes from the host. - let project = project::Entity::find_by_id(project_id) + let _project = project::Entity::find_by_id(project_id) .filter( Condition::all() .add(project::Column::HostConnectionId.eq(connection.id as i32)) @@ -2140,7 +2132,6 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; - let room_id = project.room_id; // Update metadata. worktree::Entity::update(worktree::ActiveModel { @@ -2220,7 +2211,7 @@ impl Database { } let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; - Ok((room_id, connection_ids)) + Ok(connection_ids) }) .await } @@ -2230,9 +2221,10 @@ impl Database { update: &proto::UpdateDiagnosticSummary, connection: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project_id = ProjectId::from_proto(update.project_id); - let worktree_id = update.worktree_id as i64; + let project_id = ProjectId::from_proto(update.project_id); + let worktree_id = update.worktree_id as i64; + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let summary = update .summary .as_ref() @@ -2274,7 +2266,7 @@ impl Database { .await?; let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; - Ok((project.room_id, connection_ids)) + Ok(connection_ids) }) .await } @@ -2284,8 +2276,9 @@ impl Database { update: &proto::StartLanguageServer, connection: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project_id = ProjectId::from_proto(update.project_id); + let project_id = ProjectId::from_proto(update.project_id); + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let server = update .server .as_ref() @@ -2319,7 +2312,7 @@ impl Database { .await?; let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; - Ok((project.room_id, connection_ids)) + Ok(connection_ids) }) .await } @@ -2329,7 +2322,8 @@ impl Database { project_id: ProjectId, connection: ConnectionId, ) -> Result> { - self.room_transaction(|tx| async move { + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let participant = room_participant::Entity::find() .filter( Condition::all() @@ -2455,7 +2449,6 @@ impl Database { .all(&*tx) .await?; - let room_id = project.room_id; let project = Project { collaborators: collaborators .into_iter() @@ -2475,7 +2468,7 @@ impl Database { }) .collect(), }; - Ok((room_id, (project, replica_id as ReplicaId))) + Ok((project, replica_id as ReplicaId)) }) .await } @@ -2485,7 +2478,8 @@ impl Database { project_id: ProjectId, connection: ConnectionId, ) -> Result> { - self.room_transaction(|tx| async move { + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let result = project_collaborator::Entity::delete_many() .filter( Condition::all() @@ -2521,7 +2515,7 @@ impl Database { host_connection_id: project.host_connection()?, connection_ids, }; - Ok((project.room_id, left_project)) + Ok(left_project) }) .await } @@ -2531,11 +2525,8 @@ impl Database { project_id: ProjectId, connection_id: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project = project::Entity::find_by_id(project_id) - .one(&*tx) - .await? - .ok_or_else(|| anyhow!("no such project"))?; + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let collaborators = project_collaborator::Entity::find() .filter(project_collaborator::Column::ProjectId.eq(project_id)) .all(&*tx) @@ -2553,7 +2544,7 @@ impl Database { .iter() .any(|collaborator| collaborator.connection_id == connection_id) { - Ok((project.room_id, collaborators)) + Ok(collaborators) } else { Err(anyhow!("no such project"))? } @@ -2566,11 +2557,8 @@ impl Database { project_id: ProjectId, connection_id: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project = project::Entity::find_by_id(project_id) - .one(&*tx) - .await? - .ok_or_else(|| anyhow!("no such project"))?; + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let mut collaborators = project_collaborator::Entity::find() .filter(project_collaborator::Column::ProjectId.eq(project_id)) .stream(&*tx) @@ -2583,7 +2571,7 @@ impl Database { } if connection_ids.contains(&connection_id) { - Ok((project.room_id, connection_ids)) + Ok(connection_ids) } else { Err(anyhow!("no such project"))? } @@ -2613,6 +2601,17 @@ impl Database { Ok(guest_connection_ids) } + async fn room_id_for_project(&self, project_id: ProjectId) -> Result { + self.transaction(|tx| async move { + let project = project::Entity::find_by_id(project_id) + .one(&*tx) + .await? + .ok_or_else(|| anyhow!("project {} not found", project_id))?; + Ok(project.room_id) + }) + .await + } + // access tokens pub async fn create_access_token_hash( @@ -2763,21 +2762,48 @@ impl Database { self.run(body).await } - async fn room_transaction(&self, f: F) -> Result> + async fn room_transaction(&self, room_id: RoomId, f: F) -> Result> where F: Send + Fn(TransactionHandle) -> Fut, - Fut: Send + Future>, + Fut: Send + Future>, { - let data = self - .optional_room_transaction(move |tx| { - let future = f(tx); - async { - let data = future.await?; - Ok(Some(data)) + let body = async { + loop { + let lock = self.rooms.entry(room_id).or_default().clone(); + let _guard = lock.lock_owned().await; + let (tx, result) = self.with_transaction(&f).await?; + match result { + Ok(data) => { + match tx.commit().await.map_err(Into::into) { + Ok(()) => { + return Ok(RoomGuard { + data, + _guard, + _not_send: PhantomData, + }); + } + Err(error) => { + if is_serialization_error(&error) { + // Retry (don't break the loop) + } else { + return Err(error); + } + } + } + } + Err(error) => { + tx.rollback().await?; + if is_serialization_error(&error) { + // Retry (don't break the loop) + } else { + return Err(error); + } + } } - }) - .await?; - Ok(data.unwrap()) + } + }; + + self.run(body).await } async fn with_transaction(&self, f: &F) -> Result<(DatabaseTransaction, Result)>