From 6b0ac51d65ba0946ae2f13ed032d4ce7e00aa72b Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Wed, 30 Oct 2024 21:30:23 +0100 Subject: [PATCH 1/7] prioritize recent versions during sync --- crates/corro-agent/src/agent/handlers.rs | 13 +++++++++--- crates/corro-agent/src/api/peer.rs | 25 ++++++++++++------------ crates/corro-agent/src/broadcast/mod.rs | 2 +- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 76c12743..284002c2 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -863,15 +863,22 @@ pub async fn handle_changes( continue; } - // drop items when the queue is full. + // drop old items when the queue is full. if queue.len() > max_queue_len { + let change = queue.pop_front(); + if let Some(change) = change { + for v in change.0.versions() { + let _ = seen.remove(&(change.0.actor_id, v)); + } + } + drop_log_count += 1; if is_pow_10(drop_log_count) { if drop_log_count == 1 { - warn!("dropping a change because changes queue is full"); + warn!("dropped an old change because changes queue is full"); } else { warn!( - "dropping {} changes because changes queue is full", + "droppped {} old changes because changes queue is full", drop_log_count ); } diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 35ee7f24..a6d1f8da 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -23,7 +23,6 @@ use futures::{Future, Stream, TryFutureExt, TryStreamExt}; use itertools::Itertools; use metrics::counter; use quinn::{RecvStream, SendStream}; -use rand::seq::SliceRandom; use rangemap::{RangeInclusiveMap, RangeInclusiveSet}; use rusqlite::{named_params, Connection}; use speedy::Writable; @@ -405,6 +404,7 @@ fn handle_need( -- [:start]---[end_version]---[:end] ( end_version BETWEEN :start AND :end ) ) + ORDER BY start_version DESC ", )?; @@ -1179,7 +1179,6 @@ pub async fn parallel_sync( let len = syncers.len(); let (readers, mut servers) = { - let mut rng = rand::thread_rng(); syncers.into_iter().fold( (Vec::with_capacity(len), Vec::with_capacity(len)), |(mut readers, mut servers), (actor_id, addr, needs, tx, read)| { @@ -1191,6 +1190,7 @@ pub async fn parallel_sync( trace!(%actor_id, "needs: {needs:?}"); + debug!(%actor_id, %addr, "needs len: {}", needs.values().map(|needs| needs.iter().map(|need| match need { SyncNeedV1::Full {versions} => (versions.end().0 - versions.start().0) as usize + 1, SyncNeedV1::Partial {..} => 0, @@ -1200,7 +1200,7 @@ pub async fn parallel_sync( let actor_needs = needs .into_iter() .flat_map(|(actor_id, needs)| { - let mut needs: Vec<_> = needs + let needs: Vec<_> = needs .into_iter() .flat_map(|need| match need { // chunk the versions, sometimes it's 0..=1000000 and that's far too big for a chunk! @@ -1213,7 +1213,6 @@ pub async fn parallel_sync( .collect(); // NOTE: IMPORTANT! shuffle the vec so we don't keep looping over the same later on - needs.shuffle(&mut rng); needs .into_iter() @@ -1265,7 +1264,7 @@ pub async fn parallel_sync( let mut drained = 0; while drained < 10 { - let (actor_id, need) = match needs.pop_front() { + let (actor_id, need) = match needs.pop_back() { Some(popped) => popped, None => { break; @@ -2018,8 +2017,8 @@ mod tests { SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { actor_id, changeset: Changeset::Full { - version: Version(2), - changes: vec![change2], + version: Version(3), + changes: vec![change3.clone()], seqs: CrsqlSeq(0)..=CrsqlSeq(0), last_seq: CrsqlSeq(0), ts, @@ -2033,8 +2032,8 @@ mod tests { SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { actor_id, changeset: Changeset::Full { - version: Version(3), - changes: vec![change3.clone()], + version: Version(2), + changes: vec![change2.clone()], seqs: CrsqlSeq(0)..=CrsqlSeq(0), last_seq: CrsqlSeq(0), ts, @@ -2186,8 +2185,8 @@ mod tests { SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { actor_id, changeset: Changeset::Full { - version: Version(3), - changes: vec![change3], + version: Version(4), + changes: vec![change4], seqs: CrsqlSeq(0)..=CrsqlSeq(0), last_seq: CrsqlSeq(0), ts, @@ -2201,8 +2200,8 @@ mod tests { SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { actor_id, changeset: Changeset::Full { - version: Version(4), - changes: vec![change4], + version: Version(3), + changes: vec![change3], seqs: CrsqlSeq(0)..=CrsqlSeq(0), last_seq: CrsqlSeq(0), ts, diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index d8e2910a..210069e7 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -531,7 +531,7 @@ pub fn runtime_loop( } } - for mut pending in to_broadcast.drain(..) { + for mut pending in to_broadcast.drain(..).rev() { trace!("{} to broadcast: {pending:?}", actor_id); let (member_count, max_transmissions) = { From 0396633e5bd9f66ed9072780dd94425994cf8395 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 1 Nov 2024 19:44:15 +0100 Subject: [PATCH 2/7] remove continue --- crates/corro-agent/src/agent/handlers.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 284002c2..3ad429b4 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -887,7 +887,6 @@ pub async fn handle_changes( if drop_log_count == 100000000 { drop_log_count = 0; } - continue; } if let Some(mut seqs) = change.seqs().cloned() { From 8f2ddcf7014c1f4a2a05b5015d3af3ea8ab2534e Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 1 Nov 2024 21:01:38 +0100 Subject: [PATCH 3/7] drop changes from back of the queue --- crates/corro-agent/src/agent/handlers.rs | 2 +- crates/corro-agent/src/api/peer.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 3ad429b4..c9e3f09a 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -865,7 +865,7 @@ pub async fn handle_changes( // drop old items when the queue is full. if queue.len() > max_queue_len { - let change = queue.pop_front(); + let change = queue.pop_back(); if let Some(change) = change { for v in change.0.versions() { let _ = seen.remove(&(change.0.actor_id, v)); diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index a6d1f8da..d8bfb70b 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1212,8 +1212,6 @@ pub async fn parallel_sync( }) .collect(); - // NOTE: IMPORTANT! shuffle the vec so we don't keep looping over the same later on - needs .into_iter() .map(|need| (actor_id, need)) From 3f95444fb58f4f4f7ac7f485df5ceaad19dfa991 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Mon, 4 Nov 2024 17:43:59 +0100 Subject: [PATCH 4/7] loadshed broadcast --- Cargo.lock | 110 ++++++++++++++++++++--- Cargo.toml | 2 +- crates/corro-agent/src/agent/handlers.rs | 38 ++------ crates/corro-agent/src/agent/util.rs | 24 ++++- crates/corro-agent/src/broadcast/mod.rs | 43 ++++++--- 5 files changed, 159 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8646ab94..e4a2a2dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1813,9 +1813,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.1" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -2068,7 +2068,7 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi 0.3.9", "libc", "windows-sys 0.48.0", ] @@ -2097,7 +2097,7 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi 0.3.9", "io-lifetimes", "rustix", "windows-sys 0.48.0", @@ -2467,6 +2467,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -2513,7 +2525,7 @@ dependencies = [ "inotify", "kqueue", "libc", - "mio", + "mio 0.8.9", "walkdir", "windows-sys 0.45.0", ] @@ -2573,7 +2585,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi 0.3.9", "libc", ] @@ -4083,21 +4095,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" dependencies = [ "backtrace", "bytes", "libc", - "mio", - "num_cpus", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.5.5", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4112,9 +4123,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -4838,6 +4849,15 @@ dependencies = [ "windows-targets 0.48.0", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -4868,6 +4888,22 @@ dependencies = [ "windows_x86_64_msvc 0.48.0", ] +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -4880,6 +4916,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -4892,6 +4934,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -4904,6 +4952,18 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -4916,6 +4976,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -4928,6 +4994,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -4940,6 +5012,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -4952,6 +5030,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + [[package]] name = "winreg" version = "0.10.1" diff --git a/Cargo.toml b/Cargo.toml index db49b540..58a8971e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ strum = { version = "0.24.1", features = ["derive"] } tempfile = "3.5.0" thiserror = "1.0.40" time = { version = "0.3.15", features = ["macros", "serde-well-known"] } -tokio = { version = "1.34", features = ["full"] } +tokio = { version = "1.41", features = ["full"] } tokio-metrics = "0.3.0" tokio-serde = { version = "0.8", features = ["json"] } tokio-stream = { version = "0.1.12", features = ["sync"] } diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index c9e3f09a..39b681db 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -13,6 +13,7 @@ use std::{ time::{Duration, Instant}, }; +use crate::agent::util::log_at_pow_10; use crate::{ agent::{bi, bootstrap, uni, util, SyncClientError, ANNOUNCE_INTERVAL}, api::peer::parallel_sync, @@ -527,11 +528,12 @@ pub async fn handle_emptyset( mut rx_emptysets: CorroReceiver, mut tripwire: Tripwire, ) { - let mut buf: HashMap>, Timestamp)>> = + type EmptyQueue = VecDeque<(Vec>, Timestamp)>; + let mut buf: HashMap = HashMap::new(); let mut join_set: JoinSet< - HashMap>, Timestamp)>>, + HashMap, > = JoinSet::new(); loop { @@ -551,7 +553,7 @@ pub async fn handle_emptyset( maybe_change_src = rx_emptysets.recv() => match maybe_change_src { Some(change) => { if let Changeset::EmptySet { versions, ts } = change.changeset { - buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts)); + buf.entry(change.actor_id).or_default().push_back((versions.clone(), ts)); } else { warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id); } @@ -618,9 +620,9 @@ pub async fn process_emptyset( ) -> Result<(), ChangeError> { let (versions, ts) = changes; - let mut version_iter = versions.chunks(100); + let version_iter = versions.chunks(100); - while let Some(chunk) = version_iter.next() { + for chunk in version_iter { let mut conn = agent.pool().write_low().await?; debug!("processing emptyset from {:?}", actor_id); let booked = { @@ -743,7 +745,7 @@ pub async fn handle_changes( agent.config().perf.apply_queue_timeout as u64, )); - const MAX_SEEN_CACHE_LEN: usize = 10000; + const MAX_SEEN_CACHE_LEN: usize = 1000; const KEEP_SEEN_CACHE_SIZE: usize = 1000; let mut seen: IndexMap<_, RangeInclusiveSet> = IndexMap::new(); @@ -872,21 +874,7 @@ pub async fn handle_changes( } } - drop_log_count += 1; - if is_pow_10(drop_log_count) { - if drop_log_count == 1 { - warn!("dropped an old change because changes queue is full"); - } else { - warn!( - "droppped {} old changes because changes queue is full", - drop_log_count - ); - } - } - // reset count - if drop_log_count == 100000000 { - drop_log_count = 0; - } + log_at_pow_10("dropped old change from queue", &mut drop_log_count); } if let Some(mut seqs) = change.seqs().cloned() { @@ -1170,11 +1158,3 @@ mod tests { Ok(()) } } - -#[inline] -fn is_pow_10(i: u64) -> bool { - matches!( - i, - 1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000 - ) -} diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index 0c1238f1..fd06cebe 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -969,7 +969,6 @@ pub async fn process_multiple_changes( .snapshot() }; - snap.update_cleared_ts(&tx, ts) .map_err(|source| ChangeError::Rusqlite { source, @@ -988,12 +987,11 @@ pub async fn process_multiple_changes( if let Some(ts) = last_cleared { let mut booked_writer = agent - .booked() - .blocking_write("process_multiple_changes(update_cleared_ts)"); + .booked() + .blocking_write("process_multiple_changes(update_cleared_ts)"); booked_writer.update_cleared_ts(ts); } - for (_, changeset, _, _) in changesets.iter() { if let Some(ts) = changeset.ts() { let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration(); @@ -1318,3 +1316,21 @@ pub fn check_buffered_meta_to_clear( conn.prepare_cached("SELECT EXISTS(SELECT 1 FROM __corro_seq_bookkeeping WHERE site_id = ? AND version >= ? AND version <= ?)")?.query_row(params![actor_id, versions.start(), versions.end()], |row| row.get(0)) } + +pub fn log_at_pow_10(msg: &str, count: &mut u64) { + if is_pow_10(*count + 1) { + warn!("{} (log count: {})", msg, count) + } + // reset count + if *count == 100000000 { + *count = 0; + } +} + +#[inline] +fn is_pow_10(i: u64) -> bool { + matches!( + i, + 1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000 + ) +} diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index 210069e7..bdcb90cb 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, net::SocketAddr, num::NonZeroU32, pin::Pin, @@ -26,7 +26,7 @@ use speedy::Writable; use strum::EnumDiscriminants; use tokio::{ sync::mpsc, - task::{block_in_place, LocalSet}, + task::{block_in_place, JoinSet, LocalSet}, time::interval, }; use tokio_stream::StreamExt; @@ -41,7 +41,7 @@ use corro_types::{ channel::{bounded, CorroReceiver, CorroSender}, }; -use crate::transport::Transport; +use crate::{agent::util::log_at_pow_10, transport::Transport}; #[derive(Clone)] struct TimerSpawner { @@ -374,6 +374,10 @@ pub fn runtime_loop( } }); + let mut join_set = JoinSet::new(); + let max_queue_len = agent.config().perf.processing_queue_len; + const MAX_INFLIGHT_BROADCAST: usize = 10000; + tokio::spawn(async move { const BROADCAST_CUTOFF: usize = 64 * 1024; @@ -406,7 +410,8 @@ pub fn runtime_loop( let mut tripped = false; let mut ser_buf = BytesMut::new(); - let mut to_broadcast = vec![]; + let mut to_broadcast = VecDeque::new(); + let mut log_count = 0; loop { let branch = tokio::select! { @@ -447,10 +452,10 @@ pub fn runtime_loop( } Branch::BroadcastTick => { if !bcast_buf.is_empty() { - to_broadcast.push(PendingBroadcast::new(bcast_buf.split().freeze())); + to_broadcast.push_front(PendingBroadcast::new(bcast_buf.split().freeze())); } if !local_bcast_buf.is_empty() { - to_broadcast.push(PendingBroadcast::new_local( + to_broadcast.push_front(PendingBroadcast::new_local( local_bcast_buf.split().freeze(), )); } @@ -501,7 +506,7 @@ pub fn runtime_loop( } if local_bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast.push(PendingBroadcast::new_local( + to_broadcast.push_front(PendingBroadcast::new_local( local_bcast_buf.split().freeze(), )); } @@ -514,13 +519,14 @@ pub fn runtime_loop( } if bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast.push(PendingBroadcast::new(bcast_buf.split().freeze())); + to_broadcast + .push_front(PendingBroadcast::new(bcast_buf.split().freeze())); } } } Branch::WokePendingBroadcast(pending) => { trace!("handling Branch::WokePendingBroadcast"); - to_broadcast.push(pending); + to_broadcast.push_front(pending); } Branch::Metrics => { trace!("handling Branch::Metrics"); @@ -531,7 +537,10 @@ pub fn runtime_loop( } } - for mut pending in to_broadcast.drain(..).rev() { + while join_set.try_join_next().is_some() {} + + while !to_broadcast.is_empty() && join_set.len() < MAX_INFLIGHT_BROADCAST { + let mut pending = to_broadcast.pop_front().unwrap(); trace!("{} to broadcast: {pending:?}", actor_id); let (member_count, max_transmissions) = { @@ -574,7 +583,7 @@ pub fn runtime_loop( for addr in broadcast_to { debug!(actor = %actor_id, "broadcasting {} bytes to: {addr}", pending.payload.len()); - tokio::spawn(transmit_broadcast( + join_set.spawn(transmit_broadcast( pending.payload.clone(), transport.clone(), addr, @@ -597,6 +606,18 @@ pub fn runtime_loop( } } } + + // if broadcast queue is over the max, drop the oldest, most sent item + if to_broadcast.len() > max_queue_len { + let max_sent = to_broadcast + .iter() + .enumerate() + .max_by_key(|(_, val)| val.send_count); + if let Some((i, _)) = max_sent { + to_broadcast.remove(i); + log_at_pow_10("dropped broadcast from queue", &mut log_count); + }; + } } info!("broadcasts are done"); }); From a9e861d1bcd35d94678b30bf6772cf2a4f263133 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Mon, 4 Nov 2024 17:52:41 +0100 Subject: [PATCH 5/7] revert change to cache len --- crates/corro-agent/src/agent/handlers.rs | 2 +- crates/corro-agent/src/broadcast/mod.rs | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 39b681db..bd88cfc8 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -745,7 +745,7 @@ pub async fn handle_changes( agent.config().perf.apply_queue_timeout as u64, )); - const MAX_SEEN_CACHE_LEN: usize = 1000; + const MAX_SEEN_CACHE_LEN: usize = 10000; const KEEP_SEEN_CACHE_SIZE: usize = 1000; let mut seen: IndexMap<_, RangeInclusiveSet> = IndexMap::new(); diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index bdcb90cb..b579a949 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -374,10 +374,6 @@ pub fn runtime_loop( } }); - let mut join_set = JoinSet::new(); - let max_queue_len = agent.config().perf.processing_queue_len; - const MAX_INFLIGHT_BROADCAST: usize = 10000; - tokio::spawn(async move { const BROADCAST_CUTOFF: usize = 64 * 1024; @@ -410,6 +406,9 @@ pub fn runtime_loop( let mut tripped = false; let mut ser_buf = BytesMut::new(); + let mut join_set = JoinSet::new(); + let max_queue_len = agent.config().perf.processing_queue_len; + const MAX_INFLIGHT_BROADCAST: usize = 10000; let mut to_broadcast = VecDeque::new(); let mut log_count = 0; From ce517432196718091dd65dd0e2bb2499487bd5f2 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Wed, 6 Nov 2024 16:33:34 +0100 Subject: [PATCH 6/7] test order of broadcast and sync changes --- crates/corro-agent/src/agent/handlers.rs | 2 +- crates/corro-agent/src/agent/mod.rs | 1 + crates/corro-agent/src/agent/uni.rs | 36 +- crates/corro-agent/src/api/peer.rs | 60 +++ crates/corro-agent/src/broadcast/mod.rs | 521 ++++++++++++++--------- crates/corro-pg/src/lib.rs | 2 +- crates/corro-tests/src/lib.rs | 4 +- crates/corro-types/src/pubsub.rs | 4 +- 8 files changed, 395 insertions(+), 235 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index bd88cfc8..664a90c6 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -119,7 +119,7 @@ pub fn spawn_incoming_connection_handlers( // Spawn handler tasks for this connection spawn_foca_handler(&agent, &tripwire, &conn); - uni::spawn_unipayload_handler(&tripwire, &conn, agent.clone()); + uni::spawn_unipayload_handler(&tripwire, &conn, agent.cluster_id(), agent.tx_changes().clone()); bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn); }); } diff --git a/crates/corro-agent/src/agent/mod.rs b/crates/corro-agent/src/agent/mod.rs index f8e5e3b1..186e122d 100644 --- a/crates/corro-agent/src/agent/mod.rs +++ b/crates/corro-agent/src/agent/mod.rs @@ -28,6 +28,7 @@ pub use error::{SyncClientError, SyncRecvError}; pub use run_root::start_with_config; pub use setup::{setup, AgentOptions}; pub use util::process_multiple_changes; +pub use uni::spawn_unipayload_handler; pub const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300); #[cfg(test)] diff --git a/crates/corro-agent/src/agent/uni.rs b/crates/corro-agent/src/agent/uni.rs index 2c2ab073..753acc28 100644 --- a/crates/corro-agent/src/agent/uni.rs +++ b/crates/corro-agent/src/agent/uni.rs @@ -1,6 +1,5 @@ use corro_types::{ - agent::Agent, - broadcast::{BroadcastV1, ChangeSource, UniPayload, UniPayloadV1}, + actor::ClusterId, broadcast::{BroadcastV1, ChangeSource, ChangeV1, UniPayload, UniPayloadV1}, channel::CorroSender }; use metrics::counter; use speedy::Readable; @@ -11,7 +10,7 @@ use tripwire::Tripwire; /// Spawn a task that accepts unidirectional broadcast streams, then /// spawns another task for each incoming stream to handle. -pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, agent: Agent) { +pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, cluster_id: ClusterId, tx_changes: CorroSender<(ChangeV1, ChangeSource)>) { tokio::spawn({ let conn = conn.clone(); let mut tripwire = tripwire.clone(); @@ -39,7 +38,7 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a ); tokio::spawn({ - let agent = agent.clone(); + let tx_changes = tx_changes.clone(); async move { let mut framed = FramedRead::new( rx, @@ -48,6 +47,7 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a .new_codec(), ); + let mut changes = vec![]; loop { match StreamExt::next(&mut framed).await { Some(Ok(b)) => { @@ -57,27 +57,19 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a Ok(payload) => { trace!("parsed a payload: {payload:?}"); + match payload { UniPayload::V1 { data: UniPayloadV1::Broadcast(BroadcastV1::Change( change, )), - cluster_id, + cluster_id: payload_cluster_id, } => { - if cluster_id != agent.cluster_id() { + if cluster_id != payload_cluster_id { continue; } - if let Err(e) = agent - .tx_changes() - .send((change, ChangeSource::Broadcast)) - .await - { - error!( - "could not send change for processing: {e}" - ); - return; - } + changes.push((change, ChangeSource::Broadcast)); } } } @@ -93,6 +85,18 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a None => break, } } + + for change in changes.into_iter().rev() { + if let Err(e) = tx_changes + .send(change) + .await + { + error!( + "could not send change for processing: {e}" + ); + return; + } + } } }); } diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index d8bfb70b..f8292bc6 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1717,9 +1717,12 @@ pub async fn serve_sync( #[cfg(test)] mod tests { + use crate::api::public::api_v1_transactions; use axum::{Extension, Json}; use camino::Utf8PathBuf; + use corro_tests::launch_test_agent; use corro_tests::TEST_SCHEMA; + use corro_types::api::Statement; use corro_types::{ api::{ColumnName, TableName}, base::CrsqlDbVersion, @@ -1739,6 +1742,63 @@ mod tests { use super::*; + #[tokio::test(flavor = "multi_thread")] + async fn test_sync_changes_order() -> eyre::Result<()> { + _ = tracing_subscriber::fmt::try_init(); + + let (tripwire, _tripwire_worker, _tripwire_tx) = Tripwire::new_simple(); + + let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?; + + // create versions on the first node + let versions_range = 1..=100; + for i in versions_range.clone() { + let (status_code, body) = api_v1_transactions( + Extension(ta1.agent.clone()), + axum::Json(vec![Statement::WithParams( + "INSERT OR REPLACE INTO testsblob (id,text) VALUES (?,?)".into(), + vec![format!("service-id-{i}").into(), "service-name".into()], + )]), + ) + .await; + assert_eq!(status_code, StatusCode::OK); + + let version = body.0.version.unwrap(); + assert_eq!(version, Version(i)); + } + + let dir = tempfile::tempdir()?; + + let (ta2_agent, mut ta2_opts) = setup( + Config::builder() + .db_path(dir.path().join("corrosion.db").display().to_string()) + .gossip_addr("127.0.0.1:0".parse()?) + .api_addr("127.0.0.1:0".parse()?) + .build()?, + tripwire, + ) + .await?; + + let members = vec![(ta1.agent.actor_id(), ta1.agent.gossip_addr())]; + let _ = parallel_sync( + &ta2_agent, + &ta2_opts.transport, + members, + Default::default(), + HashMap::new(), + ) + .await?; + + for i in versions_range.rev() { + let changes = tokio::time::timeout(Duration::from_secs(5), ta2_opts.rx_changes.recv()) + .await? + .unwrap(); + assert_eq!(changes.0.versions(), Version(i)..=Version(i)); + } + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_handle_need() -> eyre::Result<()> { _ = tracing_subscriber::fmt::try_init(); diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index b579a949..d9459e6e 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -122,14 +122,13 @@ pub fn runtime_loop( agent: Agent, transport: Transport, mut rx_foca: CorroReceiver, - mut rx_bcast: CorroReceiver, + rx_bcast: CorroReceiver, to_send_tx: CorroSender<(Actor, Bytes)>, notifications_tx: CorroSender>, - mut tripwire: Tripwire, + tripwire: Tripwire, ) { debug!("starting runtime loop for actor: {actor:?}"); let rng = StdRng::from_entropy(); - let actor_id = actor.id(); let config = Arc::new(RwLock::new(make_foca_config(1.try_into().unwrap()))); @@ -374,252 +373,261 @@ pub fn runtime_loop( } }); - tokio::spawn(async move { - const BROADCAST_CUTOFF: usize = 64 * 1024; + tokio::spawn(handle_broadcasts( + agent, rx_bcast, transport, config, tripwire, + )); +} - let mut bcast_codec = LengthDelimitedCodec::builder() - .max_frame_length(100 * 1_024 * 1_024) - .new_codec(); +async fn handle_broadcasts( + agent: Agent, + mut rx_bcast: CorroReceiver, + transport: Transport, + config: Arc>, + mut tripwire: Tripwire, +) { + let actor_id = agent.actor_id(); + const BROADCAST_CUTOFF: usize = 64 * 1024; - let mut bcast_buf = BytesMut::new(); - let mut local_bcast_buf = BytesMut::new(); - let mut single_bcast_buf = BytesMut::new(); + let mut bcast_codec = LengthDelimitedCodec::builder() + .max_frame_length(100 * 1_024 * 1_024) + .new_codec(); - let mut metrics_interval = interval(Duration::from_secs(10)); + let mut bcast_buf = BytesMut::new(); + let mut local_bcast_buf = BytesMut::new(); + let mut single_bcast_buf = BytesMut::new(); - let mut rng = StdRng::from_entropy(); + let mut metrics_interval = interval(Duration::from_secs(10)); - let mut idle_pendings = FuturesUnordered::< - Pin + Send + 'static>>, - >::new(); + let mut rng = StdRng::from_entropy(); - let mut bcast_interval = interval(Duration::from_millis(500)); + let mut idle_pendings = + FuturesUnordered:: + Send + 'static>>>::new(); - enum Branch { - Broadcast(BroadcastInput), - BroadcastTick, - WokePendingBroadcast(PendingBroadcast), - Tripped, - Metrics, - } + let mut bcast_interval = interval(Duration::from_millis(500)); - let mut tripped = false; - let mut ser_buf = BytesMut::new(); - - let mut join_set = JoinSet::new(); - let max_queue_len = agent.config().perf.processing_queue_len; - const MAX_INFLIGHT_BROADCAST: usize = 10000; - let mut to_broadcast = VecDeque::new(); - let mut log_count = 0; - - loop { - let branch = tokio::select! { - biased; - input = rx_bcast.recv() => match input { - Some(input) => { - Branch::Broadcast(input) - }, - None => { - warn!("no more swim inputs"); - break; - } - }, - _ = bcast_interval.tick() => { - Branch::BroadcastTick - }, - maybe_woke = idle_pendings.next(), if !idle_pendings.is_terminated() => match maybe_woke { - Some(woke) => Branch::WokePendingBroadcast(woke), - None => { - trace!("idle pendings returned None"); - // I guess? - continue; - } - }, + enum Branch { + Broadcast(BroadcastInput), + BroadcastTick, + WokePendingBroadcast(PendingBroadcast), + Tripped, + Metrics, + } - _ = &mut tripwire, if !tripped => { - tripped = true; - Branch::Tripped + let mut tripped = false; + let mut ser_buf = BytesMut::new(); + + let mut join_set = JoinSet::new(); + let max_queue_len = agent.config().perf.processing_queue_len; + const MAX_INFLIGHT_BROADCAST: usize = 10000; + let mut to_broadcast = VecDeque::new(); + let mut log_count = 0; + + loop { + let branch = tokio::select! { + biased; + input = rx_bcast.recv() => match input { + Some(input) => { + Branch::Broadcast(input) }, - _ = metrics_interval.tick() => { - Branch::Metrics + None => { + warn!("no more swim inputs"); + break; } - }; + }, + _ = bcast_interval.tick() => { + Branch::BroadcastTick + }, + maybe_woke = idle_pendings.next(), if !idle_pendings.is_terminated() => match maybe_woke { + Some(woke) => Branch::WokePendingBroadcast(woke), + None => { + trace!("idle pendings returned None"); + // I guess? + continue; + } + }, + + _ = &mut tripwire, if !tripped => { + tripped = true; + Branch::Tripped + }, + _ = metrics_interval.tick() => { + Branch::Metrics + } + }; - match branch { - Branch::Tripped => { - // nothing to do here, yet! + match branch { + Branch::Tripped => { + // nothing to do here, yet! + } + Branch::BroadcastTick => { + if !bcast_buf.is_empty() { + to_broadcast.push_front(PendingBroadcast::new(bcast_buf.split().freeze())); } - Branch::BroadcastTick => { - if !bcast_buf.is_empty() { - to_broadcast.push_front(PendingBroadcast::new(bcast_buf.split().freeze())); - } - if !local_bcast_buf.is_empty() { - to_broadcast.push_front(PendingBroadcast::new_local( - local_bcast_buf.split().freeze(), - )); - } + if !local_bcast_buf.is_empty() { + to_broadcast.push_front(PendingBroadcast::new_local( + local_bcast_buf.split().freeze(), + )); } - Branch::Broadcast(input) => { - trace!("handling Branch::Broadcast"); - let (bcast, is_local) = match input { - BroadcastInput::Rebroadcast(bcast) => (bcast, false), - BroadcastInput::AddBroadcast(bcast) => (bcast, true), - }; - trace!("adding broadcast: {bcast:?}, local? {is_local}"); - - if let Err(e) = (UniPayload::V1 { - data: UniPayloadV1::Broadcast(bcast.clone()), - cluster_id: agent.cluster_id(), - }) - .write_to_stream((&mut ser_buf).writer()) + } + Branch::Broadcast(input) => { + trace!("handling Branch::Broadcast"); + let (bcast, is_local) = match input { + BroadcastInput::Rebroadcast(bcast) => (bcast, false), + BroadcastInput::AddBroadcast(bcast) => (bcast, true), + }; + trace!("adding broadcast: {bcast:?}, local? {is_local}"); + + if let Err(e) = (UniPayload::V1 { + data: UniPayloadV1::Broadcast(bcast.clone()), + cluster_id: agent.cluster_id(), + }) + .write_to_stream((&mut ser_buf).writer()) + { + error!("could not encode UniPayload::V1 Broadcast: {e}"); + ser_buf.clear(); + continue; + } + trace!("ser buf len: {}", ser_buf.len()); + + if is_local { + if let Err(e) = + bcast_codec.encode(ser_buf.split().freeze(), &mut single_bcast_buf) { - error!("could not encode UniPayload::V1 Broadcast: {e}"); - ser_buf.clear(); + error!("could not encode local broadcast: {e}"); + single_bcast_buf.clear(); continue; } - trace!("ser buf len: {}", ser_buf.len()); - if is_local { - if let Err(e) = - bcast_codec.encode(ser_buf.split().freeze(), &mut single_bcast_buf) - { - error!("could not encode local broadcast: {e}"); - single_bcast_buf.clear(); - continue; - } - - let payload = single_bcast_buf.split().freeze(); + let payload = single_bcast_buf.split().freeze(); - local_bcast_buf.extend_from_slice(&payload); - - { - let members = agent.members().read(); - for addr in members.ring0(agent.cluster_id()) { - // this spawns, so we won't be holding onto the read lock for long - tokio::spawn(transmit_broadcast( - payload.clone(), - transport.clone(), - addr, - )); - } - } + local_bcast_buf.extend_from_slice(&payload); - if local_bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast.push_front(PendingBroadcast::new_local( - local_bcast_buf.split().freeze(), + { + let members = agent.members().read(); + for addr in members.ring0(agent.cluster_id()) { + // this spawns, so we won't be holding onto the read lock for long + tokio::spawn(transmit_broadcast( + payload.clone(), + transport.clone(), + addr, )); } - } else { - if let Err(e) = bcast_codec.encode(ser_buf.split().freeze(), &mut bcast_buf) - { - error!("could not encode broadcast: {e}"); - bcast_buf.clear(); - continue; - } + } - if bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast - .push_front(PendingBroadcast::new(bcast_buf.split().freeze())); - } + if local_bcast_buf.len() >= BROADCAST_CUTOFF { + to_broadcast.push_front(PendingBroadcast::new_local( + local_bcast_buf.split().freeze(), + )); + } + } else { + if let Err(e) = bcast_codec.encode(ser_buf.split().freeze(), &mut bcast_buf) { + error!("could not encode broadcast: {e}"); + bcast_buf.clear(); + continue; + } + + if bcast_buf.len() >= BROADCAST_CUTOFF { + to_broadcast.push_front(PendingBroadcast::new(bcast_buf.split().freeze())); } } - Branch::WokePendingBroadcast(pending) => { - trace!("handling Branch::WokePendingBroadcast"); - to_broadcast.push_front(pending); - } - Branch::Metrics => { - trace!("handling Branch::Metrics"); - gauge!("corro.broadcast.pending.count").set(idle_pendings.len() as f64); - gauge!("corro.broadcast.buffer.capacity").set(bcast_buf.capacity() as f64); - gauge!("corro.broadcast.serialization.buffer.capacity") - .set(ser_buf.capacity() as f64); - } } + Branch::WokePendingBroadcast(pending) => { + trace!("handling Branch::WokePendingBroadcast"); + to_broadcast.push_front(pending); + } + Branch::Metrics => { + trace!("handling Branch::Metrics"); + gauge!("corro.broadcast.pending.count").set(idle_pendings.len() as f64); + gauge!("corro.broadcast.buffer.capacity").set(bcast_buf.capacity() as f64); + gauge!("corro.broadcast.serialization.buffer.capacity") + .set(ser_buf.capacity() as f64); + } + } - while join_set.try_join_next().is_some() {} - - while !to_broadcast.is_empty() && join_set.len() < MAX_INFLIGHT_BROADCAST { - let mut pending = to_broadcast.pop_front().unwrap(); - trace!("{} to broadcast: {pending:?}", actor_id); - - let (member_count, max_transmissions) = { - let config = config.read(); - let members = agent.members().read(); - let count = members.states.len(); - let ring0_count = members.ring0(agent.cluster_id()).count(); - let max_transmissions = config.max_transmissions.get(); - ( - std::cmp::max( - config.num_indirect_probes.get(), - (count - ring0_count) / (max_transmissions as usize * 10), - ), - max_transmissions, - ) - }; + while join_set.try_join_next().is_some() {} + + while !to_broadcast.is_empty() && join_set.len() < MAX_INFLIGHT_BROADCAST { + let mut pending = to_broadcast.pop_front().unwrap(); + trace!("{} to broadcast: {pending:?}", actor_id); + + let (member_count, max_transmissions) = { + let config = config.read(); + let members = agent.members().read(); + let count = members.states.len(); + let ring0_count = members.ring0(agent.cluster_id()).count(); + let max_transmissions = config.max_transmissions.get(); + ( + std::cmp::max( + config.num_indirect_probes.get(), + (count - ring0_count) / (max_transmissions as usize * 10), + ), + max_transmissions, + ) + }; - let broadcast_to = { - agent - .members() - .read() - .states - .iter() - .filter_map(|(member_id, state)| { - // don't broadcast to ourselves... or ring0 if local broadcast - if *member_id == actor_id - || state.cluster_id != agent.cluster_id() - || (pending.is_local && state.is_ring0()) - || pending.sent_to.contains(&state.addr) - // don't broadcast to this peer - { - None - } else { - Some(state.addr) - } - }) - .choose_multiple(&mut rng, member_count) - }; + let broadcast_to = { + agent + .members() + .read() + .states + .iter() + .filter_map(|(member_id, state)| { + // don't broadcast to ourselves... or ring0 if local broadcast + if *member_id == actor_id + || state.cluster_id != agent.cluster_id() + || (pending.is_local && state.is_ring0()) + || pending.sent_to.contains(&state.addr) + // don't broadcast to this peer + { + None + } else { + Some(state.addr) + } + }) + .choose_multiple(&mut rng, member_count) + }; - for addr in broadcast_to { - debug!(actor = %actor_id, "broadcasting {} bytes to: {addr}", pending.payload.len()); + for addr in broadcast_to { + debug!(actor = %actor_id, "broadcasting {} bytes to: {addr}", pending.payload.len()); - join_set.spawn(transmit_broadcast( - pending.payload.clone(), - transport.clone(), - addr, - )); + join_set.spawn(transmit_broadcast( + pending.payload.clone(), + transport.clone(), + addr, + )); - pending.sent_to.insert(addr); - } + pending.sent_to.insert(addr); + } - if let Some(send_count) = pending.send_count.checked_add(1) { - trace!("send_count: {send_count}, max_transmissions: {max_transmissions}"); - pending.send_count = send_count; - - if send_count < max_transmissions { - debug!("queueing for re-send"); - idle_pendings.push(Box::pin(async move { - // FIXME: calculate sleep duration based on send count - tokio::time::sleep(Duration::from_millis(500)).await; - pending - })); - } + if let Some(send_count) = pending.send_count.checked_add(1) { + trace!("send_count: {send_count}, max_transmissions: {max_transmissions}"); + pending.send_count = send_count; + + if send_count < max_transmissions { + debug!("queueing for re-send"); + idle_pendings.push(Box::pin(async move { + // FIXME: calculate sleep duration based on send count + tokio::time::sleep(Duration::from_millis(500)).await; + pending + })); } } + } - // if broadcast queue is over the max, drop the oldest, most sent item - if to_broadcast.len() > max_queue_len { - let max_sent = to_broadcast - .iter() - .enumerate() - .max_by_key(|(_, val)| val.send_count); - if let Some((i, _)) = max_sent { - to_broadcast.remove(i); - log_at_pow_10("dropped broadcast from queue", &mut log_count); - }; - } + // if broadcast queue is over the max, drop the oldest, most sent item + if to_broadcast.len() > max_queue_len { + let max_sent = to_broadcast + .iter() + .enumerate() + .max_by_key(|(_, val)| val.send_count); + if let Some((i, _)) = max_sent { + to_broadcast.remove(i); + log_at_pow_10("dropped broadcast from queue", &mut log_count); + }; } - info!("broadcasts are done"); - }); + } + + info!("broadcasts are done"); } fn diff_member_states( @@ -812,3 +820,90 @@ async fn transmit_broadcast(payload: Bytes, transport: Transport, addr: SocketAd } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::agent::spawn_unipayload_handler; + use corro_tests::launch_test_agent; + use corro_types::{ + base::{CrsqlSeq, Version}, + broadcast::{BroadcastV1, ChangeV1, Changeset}, + }; + use uuid::Uuid; + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_broadcast_order() -> eyre::Result<()> { + tracing_subscriber::fmt() + .with_ansi(false) + .with_max_level(tracing::Level::DEBUG) + .init(); + let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple(); + let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?; + + let (tx_bcast, rx_bcast) = bounded(100, "bcast"); + let (tx_rtt, _) = mpsc::channel(100); + + let config = Arc::new(RwLock::new(make_foca_config(1.try_into().unwrap()))); + let transport = Transport::new(&ta1.config.gossip, tx_rtt).await?; + + let server_config = quinn_plaintext::server_config(); + let endpoint = quinn::Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())?; + let ta2_gossip_addr = endpoint.local_addr()?; + println!("listening on {ta2_gossip_addr}"); + + let ta2_actor = Actor::new( + ActorId(Uuid::new_v4()), + ta2_gossip_addr, + Default::default(), + ta1.agent.cluster_id(), + ); + ta1.agent.members().write().add_member(&ta2_actor); + + tokio::spawn(handle_broadcasts( + ta1.agent.clone(), + rx_bcast, + transport, + config, + tripwire.clone(), + )); + + let actor_id = ta1.agent.actor_id(); + for i in 0..5 { + tx_bcast + .send(BroadcastInput::Rebroadcast(BroadcastV1::Change(ChangeV1 { + actor_id, + changeset: Changeset::Full { + version: Version(i), + changes: vec![], + seqs: CrsqlSeq(0)..=CrsqlSeq(0), + last_seq: CrsqlSeq(0), + ts: Default::default(), + }, + }))) + .await?; + } + + if let Some(conn) = endpoint.accept().await { + info!("accepting connection"); + let conn = conn.await.unwrap(); + + let (tx_changes, mut rx_changes) = bounded(100, "changes"); + spawn_unipayload_handler(&tripwire, &conn, ta1.agent.cluster_id(), tx_changes); + + // we should receive five items starting from the biggest version + for i in (0..5).rev() { + let changes = tokio::time::timeout(Duration::from_secs(5), rx_changes.recv()) + .await? + .unwrap(); + assert_eq!(changes.0.versions(), Version(i)..=Version(i)); + } + } + + tripwire_tx.send(()).await.ok(); + tripwire_worker.await; + spawn::wait_for_all_pending_handles().await; + + Ok(()) + } +} diff --git a/crates/corro-pg/src/lib.rs b/crates/corro-pg/src/lib.rs index 9b0caf4a..68c225be 100644 --- a/crates/corro-pg/src/lib.rs +++ b/crates/corro-pg/src/lib.rs @@ -982,7 +982,7 @@ pub async fn start( let fields = match field_types( stmt, cmd, - FieldFormats::Each(&result_formats), + FieldFormats::Each(result_formats), ) { Ok(fields) => fields, Err(e) => { diff --git a/crates/corro-tests/src/lib.rs b/crates/corro-tests/src/lib.rs index 480007b3..3aa223ac 100644 --- a/crates/corro-tests/src/lib.rs +++ b/crates/corro-tests/src/lib.rs @@ -57,6 +57,7 @@ pub struct TestAgent { pub agent: Agent, pub bookie: Bookie, pub tmpdir: Arc, + pub config: Config, } pub async fn launch_test_agent Result>( @@ -78,7 +79,7 @@ pub async fn launch_test_agent Result Result &String { - return &self.new_query; - } + pub fn new_query(&self) -> &String { &self.new_query } } const CHANGE_ID_COL: &str = "id"; From 99772469c4641c5239602e2dcda47533f61b1c09 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Wed, 6 Nov 2024 20:12:10 +0100 Subject: [PATCH 7/7] test loadshedding in broadcast queue --- crates/corro-agent/src/agent/handlers.rs | 38 ++++++------ crates/corro-agent/src/broadcast/mod.rs | 79 +++++++++++++++++++++--- 2 files changed, 89 insertions(+), 28 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 664a90c6..330bcb93 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -119,7 +119,12 @@ pub fn spawn_incoming_connection_handlers( // Spawn handler tasks for this connection spawn_foca_handler(&agent, &tripwire, &conn); - uni::spawn_unipayload_handler(&tripwire, &conn, agent.cluster_id(), agent.tx_changes().clone()); + uni::spawn_unipayload_handler( + &tripwire, + &conn, + agent.cluster_id(), + agent.tx_changes().clone(), + ); bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn); }); } @@ -529,12 +534,9 @@ pub async fn handle_emptyset( mut tripwire: Tripwire, ) { type EmptyQueue = VecDeque<(Vec>, Timestamp)>; - let mut buf: HashMap = - HashMap::new(); + let mut buf: HashMap = HashMap::new(); - let mut join_set: JoinSet< - HashMap, - > = JoinSet::new(); + let mut join_set: JoinSet> = JoinSet::new(); loop { tokio::select! { @@ -865,18 +867,6 @@ pub async fn handle_changes( continue; } - // drop old items when the queue is full. - if queue.len() > max_queue_len { - let change = queue.pop_back(); - if let Some(change) = change { - for v in change.0.versions() { - let _ = seen.remove(&(change.0.actor_id, v)); - } - } - - log_at_pow_10("dropped old change from queue", &mut drop_log_count); - } - if let Some(mut seqs) = change.seqs().cloned() { let v = *change.versions().start(); if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) { @@ -927,6 +917,18 @@ pub async fn handle_changes( } } + // drop old items when the queue is full. + if queue.len() > max_queue_len { + let change = queue.pop_back(); + if let Some(change) = change { + for v in change.0.versions() { + let _ = seen.remove(&(change.0.actor_id, v)); + } + } + + log_at_pow_10("dropped old change from queue", &mut drop_log_count); + } + if let Some(recv_lag) = recv_lag { let src_str: &'static str = src.into(); histogram!("corro.agent.changes.recv.lag.seconds", "source" => src_str) diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index d9459e6e..bfc18c95 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -614,22 +614,32 @@ async fn handle_broadcasts( } } - // if broadcast queue is over the max, drop the oldest, most sent item - if to_broadcast.len() > max_queue_len { - let max_sent = to_broadcast - .iter() - .enumerate() - .max_by_key(|(_, val)| val.send_count); - if let Some((i, _)) = max_sent { - to_broadcast.remove(i); - log_at_pow_10("dropped broadcast from queue", &mut log_count); - }; + if drop_oldest_broadcast(&mut to_broadcast, max_queue_len).is_some() { + log_at_pow_10("dropped old change from queue", &mut log_count); } } info!("broadcasts are done"); } +// Drop the oldest, most sent item +fn drop_oldest_broadcast( + queue: &mut VecDeque, + max: usize, +) -> Option { + if queue.len() > max { + let max_sent: Option<(_, _)> = queue + .iter() + .enumerate() + .max_by_key(|(_, val)| val.send_count); + if let Some((i, _)) = max_sent { + return queue.remove(i); + } + } + + None +} + fn diff_member_states( agent: &Agent, foca: &Foca, StdRng, NoCustomBroadcast>, @@ -832,6 +842,55 @@ mod tests { }; use uuid::Uuid; + #[test] + fn test_behaviour_when_queue_is_full() -> eyre::Result<()> { + let max = 4; + let mut queue = VecDeque::new(); + + assert!(drop_oldest_broadcast(&mut queue, max).is_none()); + + + queue.push_front(build_broadcast(1, 0)); + queue.push_front(build_broadcast(2, 3)); + queue.push_front(build_broadcast(3, 1)); + queue.push_front(build_broadcast(4, 1)); + queue.push_front(build_broadcast(5, 2)); + queue.push_front(build_broadcast(6, 1)); + queue.push_front(build_broadcast(7, 3)); + queue.push_front(build_broadcast(8, 0)); + + // drop oldest item with highest send count + let dropped = drop_oldest_broadcast(&mut queue, max).unwrap(); + assert_eq!(dropped.send_count, 3); + assert_eq!(2_i64.to_be_bytes(), dropped.payload.as_ref()); + + let dropped = drop_oldest_broadcast(&mut queue, max).unwrap(); + assert_eq!(dropped.send_count, 3); + assert_eq!(7_i64.to_be_bytes(), dropped.payload.as_ref()); + + let dropped = drop_oldest_broadcast(&mut queue, max).unwrap(); + assert_eq!(dropped.send_count, 2); + assert_eq!(5_i64.to_be_bytes(), dropped.payload.as_ref()); + + let dropped = drop_oldest_broadcast(&mut queue, max).unwrap(); + assert_eq!(dropped.send_count, 1); + assert_eq!(3_i64.to_be_bytes(), dropped.payload.as_ref()); + + // queue is still at max now, no item gets dropped + assert!(drop_oldest_broadcast(&mut queue, max).is_none()); + + Ok(()) + } + + fn build_broadcast(id: u64, send_count: u8) -> PendingBroadcast { + PendingBroadcast { + payload: Bytes::copy_from_slice(&id.to_be_bytes()), + is_local: false, + send_count, + sent_to: HashSet::new(), + } + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_broadcast_order() -> eyre::Result<()> { tracing_subscriber::fmt()