Skip to content

Commit

Permalink
remove update handle as soon as listeners are gone
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Oct 29, 2024
1 parent 05600f7 commit 1b0cd1e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 73 deletions.
94 changes: 28 additions & 66 deletions crates/corro-agent/src/api/public/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::api::public::pubsub::MatcherUpsertError;
pub type UpdateBroadcastCache = HashMap<Uuid, broadcast::Sender<Bytes>>;
pub type SharedUpdateBroadcastCache = Arc<TokioRwLock<UpdateBroadcastCache>>;

const MAX_UNSUB_TIME: Duration = Duration::from_secs(120);
// this should be a fraction of the MAX_UNSUB_TIME
const RECEIVERS_CHECK_INTERVAL: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -107,88 +106,51 @@ pub async fn process_update_channel(
) {
let mut buf = BytesMut::new();

let mut deadline = if tx.receiver_count() == 0 {
Some(Box::pin(tokio::time::sleep(MAX_UNSUB_TIME)))
} else {
None
};

// even if there are no more subscribers
// interval check for receivers
// useful for queries that don't change often so we can cleanup...
let mut subs_check = tokio::time::interval(RECEIVERS_CHECK_INTERVAL);

loop {
let deadline_check = async {
if let Some(sleep) = deadline.as_mut() {
sleep.await
} else {
futures::future::pending().await
}
};

let notify_evt = tokio::select! {
tokio::select! {
biased;
Some(query_evt) = evt_rx.recv() => query_evt,
_ = deadline_check => {
if tx.receiver_count() == 0 {
info!(sub_id = %id, "All listeners for subscription are gone and didn't come back within {MAX_UNSUB_TIME:?}");
break;
}

// reset the deadline if there are receivers!
deadline = None;
continue;
Some(query_evt) = evt_rx.recv() => {
match make_query_event_bytes(&mut buf, &query_evt) {
Ok(b) => {
if tx.send(b).is_err() {
break;
}
},
Err(e) => {
match make_query_event_bytes(&mut buf, &NotifyEvent::Error(e.to_compact_string())) {
Ok(b) => {
let _ = tx.send(b);
}
Err(e) => {
warn!(update_id = %id, "failed to send error in update channel: {e}");
}
}
break;
}
};
},
_ = subs_check.tick() => {
if tx.receiver_count() == 0 {
if deadline.is_none() {
deadline = Some(Box::pin(tokio::time::sleep(MAX_UNSUB_TIME)));
}
} else {
deadline = None;
break;
};
continue;
},
else => {
break;
}
};

let is_still_active = match make_query_event_bytes(&mut buf, &notify_evt) {
Ok(b) => tx.send(b).is_ok(),
Err(e) => {
match make_query_event_bytes(&mut buf, &NotifyEvent::Error(e.to_compact_string())) {
Ok(b) => {
let _ = tx.send(b);
}
Err(e) => {
warn!(update_id = %id, "error sending error: {e}");
}
}
break;
}
};

if is_still_active {
deadline = None;
} else {
debug!(sub_id = %id, "no active listeners to receive subscription event: {notify_evt:?}");
if deadline.is_none() {
deadline = Some(Box::pin(tokio::time::sleep(MAX_UNSUB_TIME)));
}
}
}

warn!(sub_id = %id, "subscription query channel done");
warn!(sub_id = %id, "updates channel done");

// remove and get handle from the agent's "matchers"
let handle = match updates.remove(&id) {
Some(h) => {
info!(update_id = %id, "Removed update from process_update_channel");
info!(update_id = %id, "Removed update handle from process_update_channel");
h
}
None => {
warn!(update_id = %id, "subscription handle was already gone. odd!");
warn!(update_id = %id, "update handle was already gone. odd!");
return;
}
};
Expand Down Expand Up @@ -234,7 +196,7 @@ async fn forward_update_bytes_to_body_sender(
buf.extend_from_slice(&event_buf);
if buf.len() >= 64 * 1024 {
if let Err(e) = tx.send_data(buf.split().freeze()).await {
warn!(update_id = %update.id(), "could not forward subscription query event to receiver: {e}");
warn!(update_id = %update.id(), "could not forward update query event to receiver: {e}");
return;
}
};
Expand Down Expand Up @@ -265,8 +227,8 @@ async fn forward_update_bytes_to_body_sender(
}
},
_ = update.cancelled() => {
// info!(update_id = %update.id(), "update cancelled, aborting forwarding bytes to subscriber");
// return;
info!(update_id = %update.id(), "update cancelled, aborting forwarding bytes to subscriber");
return;
},
_ = &mut tripwire => {
break;
Expand Down
14 changes: 7 additions & 7 deletions crates/corro-types/src/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,9 @@ where
}

// metrics...
for (table, pks) in candidates.clone() {
for (table, pks) in candidates.iter() {
handle
.get_counter(&table)
.get_counter(table)
.matched_count
.increment(pks.len() as u64);
}
Expand All @@ -441,14 +441,14 @@ where

if let Err(e) = handle
.changes_tx()
.try_send((candidates.clone(), db_version))
.try_send((candidates, db_version))
{
error!(sub_id = %id, "could not send change candidates to {trait_type} handler: {e}");
match e {
mpsc::error::TrySendError::Full(item) => {
warn!("channel is full, falling back to async send");

let changes_tx = handle.changes_tx().clone();
let changes_tx = handle.changes_tx();
tokio::spawn(async move {
_ = changes_tx.send(item).await;
});
Expand Down Expand Up @@ -521,11 +521,11 @@ where
// metrics...
for (id, (candidates, handle)) in candidates {
let mut match_count = 0;
for (table, pks) in candidates.clone() {
for (table, pks) in candidates.iter() {
let count = pks.len();
match_count += count;
handle
.get_counter(&table)
.get_counter(table)
.matched_count
.increment(pks.len() as u64);
}
Expand All @@ -534,7 +534,7 @@ where

if let Err(e) = handle
.changes_tx()
.try_send((candidates.clone(), db_version))
.try_send((candidates, db_version))
{
error!(sub_id = %id, "could not send change candidates to {trait_type} handler: {e}");
match e {
Expand Down

0 comments on commit 1b0cd1e

Please sign in to comment.