From 82e68799bdcd7806f006b2174c2bc6505274d681 Mon Sep 17 00:00:00 2001 From: Pavel Borzenkov Date: Tue, 30 Jul 2024 10:37:18 +0200 Subject: [PATCH] pubsub: fix a subtle race in catch up logic for barely active subs The order of events processing when Corrosion is handling a change is: - determine changes - send changes to the channel - commit changes to sub DB When Corrosion client is catching up it subscribes to changes sent to the channel and queries the full sub DB. It's possibly that the query is run *before* the changes are committed, but after they've been sent to the channel, so what happens is: * A change with ID N is processed by Corrosion and sent to the channel * catch up logic starts and subscribes to changes, but doesn't see this change * sub DB is read and sent to the client, last committed change ID is N - 1, so EndOfQuery{change_id: N - 1} is sent to the client * corrosion commits the last update so the last change in the DB becomes N * catch up handler has min_change_id = N - 1 + 1, so when it compares last sent change ID (N) with min_change_id (N) it ignores it becase the comparison is strictly greater. min_change_id is what we expect to receive, so the fix is trivial - comparison should be greater or equal. --- crates/corro-agent/src/api/public/pubsub.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/corro-agent/src/api/public/pubsub.rs b/crates/corro-agent/src/api/public/pubsub.rs index 1f7775c4..9e692313 100644 --- a/crates/corro-agent/src/api/public/pubsub.rs +++ b/crates/corro-agent/src/api/public/pubsub.rs @@ -524,7 +524,7 @@ pub async fn catch_up_sub( for i in 0..5 { min_change_id = last_change_id + 1; - if change_id > min_change_id { + if change_id >= min_change_id { // missed some updates! info!(sub_id = %matcher.id(), "attempt #{} to catch up subcription from change id: {change_id:?} (last: {last_change_id:?})", i+1); @@ -547,7 +547,7 @@ pub async fn catch_up_sub( // sleep 100 millis tokio::time::sleep(Duration::from_millis(100)).await; } - if change_id > min_change_id { + if change_id >= min_change_id { _ = evt_tx .send(error_to_query_event_bytes_with_meta( &mut buf,