diff --git a/Cargo.lock b/Cargo.lock index 8646ab94..e4a2a2dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1813,9 +1813,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.1" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -2068,7 +2068,7 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi 0.3.9", "libc", "windows-sys 0.48.0", ] @@ -2097,7 +2097,7 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi 0.3.9", "io-lifetimes", "rustix", "windows-sys 0.48.0", @@ -2467,6 +2467,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -2513,7 +2525,7 @@ dependencies = [ "inotify", "kqueue", "libc", - "mio", + "mio 0.8.9", "walkdir", "windows-sys 0.45.0", ] @@ -2573,7 +2585,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi 0.3.9", "libc", ] @@ -4083,21 +4095,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" dependencies = [ "backtrace", "bytes", "libc", - "mio", - "num_cpus", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.5.5", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4112,9 +4123,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -4838,6 +4849,15 @@ dependencies = [ "windows-targets 0.48.0", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -4868,6 +4888,22 @@ dependencies = [ "windows_x86_64_msvc 0.48.0", ] +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -4880,6 +4916,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -4892,6 +4934,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -4904,6 +4952,18 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -4916,6 +4976,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -4928,6 +4994,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -4940,6 +5012,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -4952,6 +5030,12 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + [[package]] name = "winreg" version = "0.10.1" diff --git a/Cargo.toml b/Cargo.toml index db49b540..58a8971e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ strum = { version = "0.24.1", features = ["derive"] } tempfile = "3.5.0" thiserror = "1.0.40" time = { version = "0.3.15", features = ["macros", "serde-well-known"] } -tokio = { version = "1.34", features = ["full"] } +tokio = { version = "1.41", features = ["full"] } tokio-metrics = "0.3.0" tokio-serde = { version = "0.8", features = ["json"] } tokio-stream = { version = "0.1.12", features = ["sync"] } diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index c9e3f09a..39b681db 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -13,6 +13,7 @@ use std::{ time::{Duration, Instant}, }; +use crate::agent::util::log_at_pow_10; use crate::{ agent::{bi, bootstrap, uni, util, SyncClientError, ANNOUNCE_INTERVAL}, api::peer::parallel_sync, @@ -527,11 +528,12 @@ pub async fn handle_emptyset( mut rx_emptysets: CorroReceiver, mut tripwire: Tripwire, ) { - let mut buf: HashMap>, Timestamp)>> = + type EmptyQueue = VecDeque<(Vec>, Timestamp)>; + let mut buf: HashMap = HashMap::new(); let mut join_set: JoinSet< - HashMap>, Timestamp)>>, + HashMap, > = JoinSet::new(); loop { @@ -551,7 +553,7 @@ pub async fn handle_emptyset( maybe_change_src = rx_emptysets.recv() => match maybe_change_src { Some(change) => { if let Changeset::EmptySet { versions, ts } = change.changeset { - buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts)); + buf.entry(change.actor_id).or_default().push_back((versions.clone(), ts)); } else { warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id); } @@ -618,9 +620,9 @@ pub async fn process_emptyset( ) -> Result<(), ChangeError> { let (versions, ts) = changes; - let mut version_iter = versions.chunks(100); + let version_iter = versions.chunks(100); - while let Some(chunk) = version_iter.next() { + for chunk in version_iter { let mut conn = agent.pool().write_low().await?; debug!("processing emptyset from {:?}", actor_id); let booked = { @@ -743,7 +745,7 @@ pub async fn handle_changes( agent.config().perf.apply_queue_timeout as u64, )); - const MAX_SEEN_CACHE_LEN: usize = 10000; + const MAX_SEEN_CACHE_LEN: usize = 1000; const KEEP_SEEN_CACHE_SIZE: usize = 1000; let mut seen: IndexMap<_, RangeInclusiveSet> = IndexMap::new(); @@ -872,21 +874,7 @@ pub async fn handle_changes( } } - drop_log_count += 1; - if is_pow_10(drop_log_count) { - if drop_log_count == 1 { - warn!("dropped an old change because changes queue is full"); - } else { - warn!( - "droppped {} old changes because changes queue is full", - drop_log_count - ); - } - } - // reset count - if drop_log_count == 100000000 { - drop_log_count = 0; - } + log_at_pow_10("dropped old change from queue", &mut drop_log_count); } if let Some(mut seqs) = change.seqs().cloned() { @@ -1170,11 +1158,3 @@ mod tests { Ok(()) } } - -#[inline] -fn is_pow_10(i: u64) -> bool { - matches!( - i, - 1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000 - ) -} diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index 0c1238f1..fd06cebe 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -969,7 +969,6 @@ pub async fn process_multiple_changes( .snapshot() }; - snap.update_cleared_ts(&tx, ts) .map_err(|source| ChangeError::Rusqlite { source, @@ -988,12 +987,11 @@ pub async fn process_multiple_changes( if let Some(ts) = last_cleared { let mut booked_writer = agent - .booked() - .blocking_write("process_multiple_changes(update_cleared_ts)"); + .booked() + .blocking_write("process_multiple_changes(update_cleared_ts)"); booked_writer.update_cleared_ts(ts); } - for (_, changeset, _, _) in changesets.iter() { if let Some(ts) = changeset.ts() { let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration(); @@ -1318,3 +1316,21 @@ pub fn check_buffered_meta_to_clear( conn.prepare_cached("SELECT EXISTS(SELECT 1 FROM __corro_seq_bookkeeping WHERE site_id = ? AND version >= ? AND version <= ?)")?.query_row(params![actor_id, versions.start(), versions.end()], |row| row.get(0)) } + +pub fn log_at_pow_10(msg: &str, count: &mut u64) { + if is_pow_10(*count + 1) { + warn!("{} (log count: {})", msg, count) + } + // reset count + if *count == 100000000 { + *count = 0; + } +} + +#[inline] +fn is_pow_10(i: u64) -> bool { + matches!( + i, + 1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000 + ) +} diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index 210069e7..bdcb90cb 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, net::SocketAddr, num::NonZeroU32, pin::Pin, @@ -26,7 +26,7 @@ use speedy::Writable; use strum::EnumDiscriminants; use tokio::{ sync::mpsc, - task::{block_in_place, LocalSet}, + task::{block_in_place, JoinSet, LocalSet}, time::interval, }; use tokio_stream::StreamExt; @@ -41,7 +41,7 @@ use corro_types::{ channel::{bounded, CorroReceiver, CorroSender}, }; -use crate::transport::Transport; +use crate::{agent::util::log_at_pow_10, transport::Transport}; #[derive(Clone)] struct TimerSpawner { @@ -374,6 +374,10 @@ pub fn runtime_loop( } }); + let mut join_set = JoinSet::new(); + let max_queue_len = agent.config().perf.processing_queue_len; + const MAX_INFLIGHT_BROADCAST: usize = 10000; + tokio::spawn(async move { const BROADCAST_CUTOFF: usize = 64 * 1024; @@ -406,7 +410,8 @@ pub fn runtime_loop( let mut tripped = false; let mut ser_buf = BytesMut::new(); - let mut to_broadcast = vec![]; + let mut to_broadcast = VecDeque::new(); + let mut log_count = 0; loop { let branch = tokio::select! { @@ -447,10 +452,10 @@ pub fn runtime_loop( } Branch::BroadcastTick => { if !bcast_buf.is_empty() { - to_broadcast.push(PendingBroadcast::new(bcast_buf.split().freeze())); + to_broadcast.push_front(PendingBroadcast::new(bcast_buf.split().freeze())); } if !local_bcast_buf.is_empty() { - to_broadcast.push(PendingBroadcast::new_local( + to_broadcast.push_front(PendingBroadcast::new_local( local_bcast_buf.split().freeze(), )); } @@ -501,7 +506,7 @@ pub fn runtime_loop( } if local_bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast.push(PendingBroadcast::new_local( + to_broadcast.push_front(PendingBroadcast::new_local( local_bcast_buf.split().freeze(), )); } @@ -514,13 +519,14 @@ pub fn runtime_loop( } if bcast_buf.len() >= BROADCAST_CUTOFF { - to_broadcast.push(PendingBroadcast::new(bcast_buf.split().freeze())); + to_broadcast + .push_front(PendingBroadcast::new(bcast_buf.split().freeze())); } } } Branch::WokePendingBroadcast(pending) => { trace!("handling Branch::WokePendingBroadcast"); - to_broadcast.push(pending); + to_broadcast.push_front(pending); } Branch::Metrics => { trace!("handling Branch::Metrics"); @@ -531,7 +537,10 @@ pub fn runtime_loop( } } - for mut pending in to_broadcast.drain(..).rev() { + while join_set.try_join_next().is_some() {} + + while !to_broadcast.is_empty() && join_set.len() < MAX_INFLIGHT_BROADCAST { + let mut pending = to_broadcast.pop_front().unwrap(); trace!("{} to broadcast: {pending:?}", actor_id); let (member_count, max_transmissions) = { @@ -574,7 +583,7 @@ pub fn runtime_loop( for addr in broadcast_to { debug!(actor = %actor_id, "broadcasting {} bytes to: {addr}", pending.payload.len()); - tokio::spawn(transmit_broadcast( + join_set.spawn(transmit_broadcast( pending.payload.clone(), transport.clone(), addr, @@ -597,6 +606,18 @@ pub fn runtime_loop( } } } + + // if broadcast queue is over the max, drop the oldest, most sent item + if to_broadcast.len() > max_queue_len { + let max_sent = to_broadcast + .iter() + .enumerate() + .max_by_key(|(_, val)| val.send_count); + if let Some((i, _)) = max_sent { + to_broadcast.remove(i); + log_at_pow_10("dropped broadcast from queue", &mut log_count); + }; + } } info!("broadcasts are done"); });