From 5ab85e9b76eace8b12ef771ff5b68f556f613873 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 16 Aug 2024 16:30:50 +0100 Subject: [PATCH 1/3] drop items when processing queue is full --- crates/corro-agent/src/agent/handlers.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 56b3245f..5e02747c 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -742,6 +742,7 @@ pub async fn handle_changes( agent.config().perf.apply_queue_timeout as u64, )); + const MAX_QUEUE_LEN: usize = 10000; const MAX_SEEN_CACHE_LEN: usize = 10000; const KEEP_SEEN_CACHE_SIZE: usize = 1000; let mut seen: IndexMap<_, RangeInclusiveSet> = IndexMap::new(); @@ -861,6 +862,15 @@ pub async fn handle_changes( continue; } + // drop items when the queue is full. + if queue.len() > MAX_QUEUE_LEN { + warn!( + "dropping changes from {} because changes queue is full", + change.actor_id + ); + continue; + } + if let Some(mut seqs) = change.seqs().cloned() { let v = *change.versions().start(); if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) { From 7045a76d692a44879551619c4940f51abf30b161 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 16 Aug 2024 17:22:19 +0100 Subject: [PATCH 2/3] make max queue len configurable --- crates/corro-agent/src/agent/handlers.rs | 4 ++-- crates/corro-types/src/config.rs | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 5e02747c..40c45617 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -731,6 +731,7 @@ pub async fn handle_changes( mut tripwire: Tripwire, ) { let max_changes_chunk: usize = agent.config().perf.apply_queue_len; + let max_queue_len: usize = agent.config().perf.processing_queue_len; let mut queue: VecDeque<(ChangeV1, ChangeSource, Instant)> = VecDeque::new(); let mut buf = vec![]; let mut buf_cost = 0; @@ -742,7 +743,6 @@ pub async fn handle_changes( agent.config().perf.apply_queue_timeout as u64, )); - const MAX_QUEUE_LEN: usize = 10000; const MAX_SEEN_CACHE_LEN: usize = 10000; const KEEP_SEEN_CACHE_SIZE: usize = 1000; let mut seen: IndexMap<_, RangeInclusiveSet> = IndexMap::new(); @@ -863,7 +863,7 @@ pub async fn handle_changes( } // drop items when the queue is full. - if queue.len() > MAX_QUEUE_LEN { + if queue.len() > max_queue_len { warn!( "dropping changes from {} because changes queue is full", change.actor_id diff --git a/crates/corro-types/src/config.rs b/crates/corro-types/src/config.rs index 348bd362..7681bfb9 100644 --- a/crates/corro-types/src/config.rs +++ b/crates/corro-types/src/config.rs @@ -14,6 +14,9 @@ const fn default_apply_queue() -> usize { const fn default_wal_threshold() -> usize { 10 } +const fn default_processing_queue() -> usize { + 100000 +} /// Used for the apply channel const fn default_huge_channel() -> usize { @@ -187,6 +190,8 @@ pub struct PerfConfig { pub apply_queue_len: usize, #[serde(default = "default_wal_threshold")] pub wal_threshold_gb: usize, + #[serde(default = "default_processing_queue")] + pub processing_queue_len: usize, } impl Default for PerfConfig { @@ -204,6 +209,7 @@ impl Default for PerfConfig { apply_queue_timeout: default_apply_timeout(), apply_queue_len: default_apply_queue(), wal_threshold_gb: default_wal_threshold(), + processing_queue_len: default_processing_queue(), } } } From 8c7d3b7ac5770d832aa90f621718243a0da87c6a Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Wed, 21 Aug 2024 22:31:51 +0100 Subject: [PATCH 3/3] log line every power of 10 to avoid spamming --- crates/corro-agent/src/agent/handlers.rs | 28 ++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 40c45617..76c12743 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -747,6 +747,7 @@ pub async fn handle_changes( const KEEP_SEEN_CACHE_SIZE: usize = 1000; let mut seen: IndexMap<_, RangeInclusiveSet> = IndexMap::new(); + let mut drop_log_count: u64 = 0; // complicated loop to process changes efficiently w/ a max concurrency // and a minimum chunk size for bigger and faster SQLite transactions loop { @@ -864,10 +865,21 @@ pub async fn handle_changes( // drop items when the queue is full. if queue.len() > max_queue_len { - warn!( - "dropping changes from {} because changes queue is full", - change.actor_id - ); + drop_log_count += 1; + if is_pow_10(drop_log_count) { + if drop_log_count == 1 { + warn!("dropping a change because changes queue is full"); + } else { + warn!( + "dropping {} changes because changes queue is full", + drop_log_count + ); + } + } + // reset count + if drop_log_count == 100000000 { + drop_log_count = 0; + } continue; } @@ -1152,3 +1164,11 @@ mod tests { Ok(()) } } + +#[inline] +fn is_pow_10(i: u64) -> bool { + matches!( + i, + 1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000 + ) +}