From be67362257337a99255aeba58970c812cedc9afb Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 23 Aug 2024 13:35:23 +0100 Subject: [PATCH 01/10] working function --- crates/corro-agent/src/agent/util.rs | 5 +- crates/corro-agent/src/api/peer.rs | 157 +++++++----- crates/corro-types/src/agent.rs | 1 - crates/corro-types/src/broadcast.rs | 18 +- crates/corro-types/src/sync.rs | 347 +++++++++++++++++++++++++-- 5 files changed, 452 insertions(+), 76 deletions(-) diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index 0c1238f1..826f239e 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -988,12 +988,11 @@ pub async fn process_multiple_changes( if let Some(ts) = last_cleared { let mut booked_writer = agent - .booked() - .blocking_write("process_multiple_changes(update_cleared_ts)"); + .booked() + .blocking_write("process_multiple_changes(update_cleared_ts)"); booked_writer.update_cleared_ts(ts); } - for (_, changeset, _, _) in changesets.iter() { if let Some(ts) = changeset.ts() { let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration(); diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 4522503b..85c60520 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -942,16 +942,6 @@ async fn process_sync( Ok(()) } -fn chunk_range + std::cmp::Ord + Copy>( - range: RangeInclusive, - chunk_size: usize, -) -> impl Iterator> { - range.clone().step_by(chunk_size).map(move |block_start| { - let block_end = (block_start + chunk_size as u64).min(*range.end()); - block_start..=block_end - }) -} - fn encode_sync_msg( codec: &mut LengthDelimitedCodec, encode_buf: &mut BytesMut, @@ -1127,22 +1117,7 @@ pub async fn parallel_sync( counter!("corro.sync.client.member", "id" => actor_id.to_string(), "addr" => addr.to_string()).increment(1); - let mut needs = our_sync_state.compute_available_needs(&their_sync_state); - - trace!(%actor_id, self_actor_id = %agent.actor_id(), "computed needs"); - - let cleared_ts = their_sync_state.last_cleared_ts; - - if let Some(ts) = cleared_ts { - if let Some(last_seen) = our_empty_ts.get(&actor_id) { - if last_seen.is_none() || last_seen.unwrap() < ts { - debug!(%actor_id, "got last cleared ts {cleared_ts:?} - out last_seen {last_seen:?}"); - needs.entry(actor_id).or_default().push( SyncNeedV1::Empty { ts: *last_seen }); - } - } - } - - Ok::<_, SyncError>((needs, tx, read)) + Ok::<_, SyncError>((their_sync_state, tx, read)) }.await ) }.instrument(info_span!("sync_client_handshake", %actor_id, %addr)) @@ -1150,15 +1125,15 @@ pub async fn parallel_sync( .collect::)>>() .await; - debug!("collected member needs and such!"); + debug!("collected member state and such!"); #[allow(clippy::manual_try_fold)] let syncers = results .into_iter() .fold(Ok(vec![]), |agg, (actor_id, addr, res)| match res { - Ok((needs, tx, read)) => { + Ok((state, tx, read)) => { let mut v = agg.unwrap_or_default(); - v.push((actor_id, addr, needs, tx, read)); + v.push((actor_id, addr, state, tx, read)); Ok(v) } Err(e) => { @@ -1177,12 +1152,26 @@ pub async fn parallel_sync( })?; let len = syncers.len(); + let actor_state: Vec<_> = syncers.iter().map(|x| (x.0, x.2.clone())).collect(); + let actor_needs = distribute_available_needs(our_sync_state, actor_state); let (readers, mut servers) = { let mut rng = rand::thread_rng(); syncers.into_iter().fold( (Vec::with_capacity(len), Vec::with_capacity(len)), - |(mut readers, mut servers), (actor_id, addr, needs, tx, read)| { + |(mut readers, mut servers), (actor_id, addr, state, tx, read)| { + let mut needs = actor_needs.get(&actor_id).cloned().unwrap_or_default(); + + let cleared_ts = state.last_cleared_ts; + if let Some(ts) = cleared_ts { + if let Some(last_seen) = our_empty_ts.get(&actor_id) { + if last_seen.is_none() || last_seen.unwrap() < ts { + debug!(%actor_id, "got last cleared ts {cleared_ts:?} - out last_seen {last_seen:?}"); + needs.push((actor_id, SyncNeedV1::Empty { ts: *last_seen })); + } + } + } + if needs.is_empty() { trace!(%actor_id, "no needs!"); return (readers, servers); @@ -1191,36 +1180,16 @@ pub async fn parallel_sync( trace!(%actor_id, "needs: {needs:?}"); - debug!(%actor_id, %addr, "needs len: {}", needs.values().map(|needs| needs.iter().map(|need| match need { + debug!(%actor_id, %addr, "needs len: {}", needs.iter().map(|(_, need)| match need { SyncNeedV1::Full {versions} => (versions.end().0 - versions.start().0) as usize + 1, SyncNeedV1::Partial {..} => 0, SyncNeedV1::Empty {..} => 0, - }).sum::()).sum::()); + }).sum::()); - let actor_needs = needs - .into_iter() - .flat_map(|(actor_id, needs)| { - let mut needs: Vec<_> = needs - .into_iter() - .flat_map(|need| match need { - // chunk the versions, sometimes it's 0..=1000000 and that's far too big for a chunk! - SyncNeedV1::Full { versions } => chunk_range(versions, 10) - .map(|versions| SyncNeedV1::Full { versions }) - .collect(), - - need => vec![need], - }) - .collect(); + needs.shuffle(&mut rng); - // NOTE: IMPORTANT! shuffle the vec so we don't keep looping over the same later on - needs.shuffle(&mut rng); - needs - .into_iter() - .map(|need| (actor_id, need)) - .collect::>() - }) - .collect::>(); + let actor_needs = needs.into_iter().collect::>(); servers.push(( actor_id, @@ -1466,6 +1435,40 @@ pub async fn parallel_sync( .sum::()) } +pub fn distribute_available_needs( + mut our_state: SyncStateV1, + mut states: Vec<(ActorId, SyncStateV1)>, +) -> HashMap> { + let mut final_needs: HashMap> = HashMap::new(); + + while !states.is_empty() { + let mut remove_keys = vec![]; + for (actor_id, state) in &states { + let actor_needs = our_state.get_n_needs(&state, 10); + // we can get no more needs from this actor + if actor_needs.is_empty() { + remove_keys.push(*actor_id); + } else { + let needs: Vec<_> = actor_needs + .clone() + .into_iter() + .flat_map(|(actor_id, needs)| { + needs.into_iter().map(move |need| (actor_id, need)) + }) + .collect(); + final_needs + .entry(*actor_id) + .or_default() + .extend_from_slice(&needs); + our_state.merge_needs(&actor_needs); + } + } + states.retain(|(actor, _)| !remove_keys.contains(actor)); + } + + final_needs +} + #[tracing::instrument(skip(agent, bookie, their_actor_id, read, write), fields(actor_id = %their_actor_id), err)] pub async fn serve_sync( agent: &Agent, @@ -1732,6 +1735,7 @@ mod tests { use rand::{Rng, RngCore}; use tempfile::TempDir; use tripwire::Tripwire; + use uuid::Uuid; use crate::{ agent::{process_multiple_changes, setup}, @@ -1739,6 +1743,51 @@ mod tests { }; use super::*; + #[test] + fn test_get_needs() -> eyre::Result<()> { + let original_state: SyncStateV1 = SyncStateV1::default(); + + let actor1 = ActorId(Uuid::new_v4()); + let actor2 = ActorId(Uuid::new_v4()); + let mut actor1_state = SyncStateV1::default(); + actor1_state.heads.insert(actor1, Version(60)); + actor1_state.heads.insert(actor2, Version(20)); + + let mut actor2_state = SyncStateV1::default(); + actor2_state.heads.insert(actor1, Version(60)); + actor2_state.heads.insert(actor2, Version(20)); + + let needs_map = distribute_available_needs( + original_state.clone(), + vec![ + (actor1, actor1_state.clone()), + (actor2, actor2_state.clone()), + ], + ); + println!("{:#?}", needs_map); + + let actor3 = ActorId(Uuid::new_v4()); + let mut actor3_state = SyncStateV1::default(); + actor3_state.heads.insert(actor3, Version(40)); + // actor 2 has seen only till Version(20) + actor2_state.heads.insert(actor3, Version(10)); + // actor 1 has seen up to 40 but has some needs + actor1_state.heads.insert(actor3, Version(40)); + actor1_state + .need + .insert(actor3, vec![(Version(3)..=Version(20))]); + + let needs_map = distribute_available_needs( + original_state, + vec![ + (actor1, actor1_state), + (actor2, actor2_state), + (actor3, actor3_state), + ], + ); + println!("{:#?}", needs_map); + Ok(()) + } #[tokio::test(flavor = "multi_thread")] async fn test_handle_need() -> eyre::Result<()> { diff --git a/crates/corro-types/src/agent.rs b/crates/corro-types/src/agent.rs index e199e53a..571a03aa 100644 --- a/crates/corro-types/src/agent.rs +++ b/crates/corro-types/src/agent.rs @@ -1213,7 +1213,6 @@ impl VersionsSnapshot { conn.prepare_cached("INSERT OR REPLACE INTO __corro_sync_state VALUES (?, ?)")? .execute((self.actor_id, ts))?; } - Ok(()) } diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index d2ea9b61..eb1b90cd 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -1,4 +1,9 @@ -use std::{cmp, fmt, io, num::NonZeroU32, ops::{Deref, RangeInclusive}, time::Duration}; +use std::{ + cmp, fmt, io, + num::NonZeroU32, + ops::{Deref, RangeInclusive}, + time::Duration, +}; use bytes::{Bytes, BytesMut}; use corro_api_types::{row_to_change, Change}; @@ -165,9 +170,14 @@ impl Changeset { // determine the estimated resource cost of processing a change pub fn processing_cost(&self) -> usize { match self { - Changeset::Empty { versions, .. } => cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20), - Changeset::EmptySet { versions, .. } => versions.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::(), - Changeset::Full { changes, ..} => changes.len(), + Changeset::Empty { versions, .. } => { + cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20) + } + Changeset::EmptySet { versions, .. } => versions + .iter() + .map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)) + .sum::(), + Changeset::Full { changes, .. } => changes.len(), } } diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 303dec61..72dda7ee 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -85,8 +85,72 @@ pub struct SyncStateV1 { #[speedy(default_on_eof)] pub last_cleared_ts: Option, } - impl SyncStateV1 { + pub fn contains(&self, actor_id: &ActorId, versions: &RangeInclusive) -> bool { + !(self.heads.get(actor_id).cloned().unwrap_or_default() < *versions.end() + || self + .need + .get(actor_id) + .cloned() + .unwrap_or_default() + .iter() + .any(|x| x.start() < versions.end() && versions.start() < x.end())) + } + + pub fn merge_needs(&mut self, needs: &HashMap>) { + for (actor, need) in needs { + for n in need { + match n { + SyncNeedV1::Full { versions } => { + self.merge_full_version(*actor, versions); + } + SyncNeedV1::Partial { version, seqs } => { + let mut delete = false; + self.partial_need.entry(*actor).and_modify(|e| { + if let Some(need_seqs) = e.get_mut(version) { + let mut seqs_set = + RangeInclusiveSet::from_iter(need_seqs.clone().into_iter()); + for seq in seqs { + seqs_set.remove(seq.clone()) + } + *need_seqs = Vec::from_iter(seqs_set.clone().into_iter()); + if need_seqs.is_empty() { + delete = true + } + }; + }); + + // we have gotten all sequence numbers, delete need + if delete { + if let Some(partials) = self.partial_need.get_mut(actor) { + partials.remove(version); + }; + } + } + _ => {} + } + } + } + } + + pub fn merge_full_version(&mut self, actor_id: ActorId, version: &RangeInclusive) { + let head = self.heads.entry(actor_id).or_default(); + if version.end() > head { + // check for gaps + if *head + 1 < *version.start() { + let range = *head + 1..=*version.start() - 1; + self.need.entry(actor_id).or_default().push(range); + } + *head = *version.end(); + } + + self.need.entry(actor_id).and_modify(|e| { + let mut set = RangeInclusiveSet::from_iter(e.clone().into_iter()); + set.remove(version.clone()); + *e = Vec::from_iter(set.into_iter()); + }); + } + pub fn need_len(&self) -> u64 { self.need .values() @@ -124,6 +188,147 @@ impl SyncStateV1 { .unwrap_or(0) } + pub fn get_n_needs(&self, other: &SyncStateV1, n: u64) -> HashMap> { + let mut needs: HashMap> = HashMap::new(); + let mut total = 0; + + for (actor_id, head) in other.heads.iter() { + if *actor_id == self.actor_id { + continue; + } + if *head == Version(0) { + warn!(actor_id = %other.actor_id, "sent a 0 head version for actor id {}", actor_id); + continue; + } + let other_haves = { + let mut haves = RangeInclusiveSet::from_iter([(Version(1)..=*head)].into_iter()); + + // remove needs + if let Some(other_need) = other.need.get(actor_id) { + for need in other_need.iter() { + // create gaps + haves.remove(need.clone()); + } + } + + // remove partials + if let Some(other_partials) = other.partial_need.get(actor_id) { + for (v, _) in other_partials.iter() { + haves.remove(*v..=*v); + } + } + + // we are left with all the versions they fully have! + haves + }; + + if let Some(our_need) = self.need.get(actor_id) { + for range in our_need.iter() { + for overlap in other_haves.overlapping(range) { + let start = cmp::max(range.start(), overlap.start()); + let end = cmp::min(range.end(), overlap.end()); + let left = n - total; + let new_end = cmp::min(*end, *start + left); + needs.entry(*actor_id).or_default().push(SyncNeedV1::Full { + versions: *start..=new_end, + }); + total += end.0 - start.0; + if total >= n { + return needs; + } + } + } + } + + if let Some(our_partials) = self.partial_need.get(actor_id) { + for (v, seqs) in our_partials.iter() { + if other_haves.contains(v) { + needs + .entry(*actor_id) + .or_default() + .push(SyncNeedV1::Partial { + version: *v, + seqs: seqs.clone(), + }); + } else if let Some(other_seqs) = other + .partial_need + .get(actor_id) + .and_then(|versions| versions.get(v)) + { + let max_other_seq = other_seqs.iter().map(|range| *range.end()).max(); + let max_our_seq = seqs.iter().map(|range| *range.end()).max(); + + let end_seq = cmp::max(max_other_seq, max_our_seq); + + if let Some(end) = end_seq { + let mut other_seqs_haves = + RangeInclusiveSet::from_iter([CrsqlSeq(0)..=end]); + + for seqs in other_seqs.iter() { + other_seqs_haves.remove(seqs.clone()); + } + + let seqs = seqs + .iter() + .flat_map(|range| { + other_seqs_haves + .overlapping(range) + .map(|overlap| { + let start = cmp::max(range.start(), overlap.start()); + let end = cmp::min(range.end(), overlap.end()); + *start..=*end + }) + .collect::>>() + }) + .collect::>>(); + + if !seqs.is_empty() { + needs + .entry(*actor_id) + .or_default() + .push(SyncNeedV1::Partial { version: *v, seqs }); + } + total += 1; + if total >= n { + return needs; + } + } + } + } + } + + let left = n - total; + let missing = match self.heads.get(actor_id) { + Some(our_head) => { + if head > our_head { + let new_head = cmp::min(*our_head + left, *head); + Some((*our_head + 1)..=new_head) + } else { + None + } + } + None => { + let new_head = Version(cmp::min(head.0, left)); + Some(Version(1)..=new_head) + } + }; + + if let Some(missing) = missing { + total += missing.end().0 + missing.start().0; + needs + .entry(*actor_id) + .or_default() + .push(SyncNeedV1::Full { versions: missing }); + } + + if total >= n { + return needs; + } + } + + needs + } + pub fn compute_available_needs( &self, other: &SyncStateV1, @@ -157,10 +362,10 @@ impl SyncStateV1 { } // we are left with all the versions they fully have! - haves }; + println!("actor - {actor_id}, haves - {other_haves:?}"); if let Some(our_need) = self.need.get(actor_id) { for range in our_need.iter() { for overlap in other_haves.overlapping(range) { @@ -238,10 +443,21 @@ impl SyncStateV1 { }; if let Some(missing) = missing { - needs - .entry(*actor_id) - .or_default() - .push(SyncNeedV1::Full { versions: missing }); + let mut missing = RangeInclusiveSet::from_iter([missing].into_iter()); + if let Some(other_needs) = other.need.get(actor_id) { + // remove needs + for need in other_needs.iter() { + // create gaps + missing.remove(need.clone()); + } + } + + missing.into_iter().for_each(|v| { + needs + .entry(*actor_id) + .or_default() + .push(SyncNeedV1::Full { versions: v }); + }); } } @@ -300,24 +516,31 @@ pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncState let mut last_ts = None; for (actor_id, booked) in actors { - let bookedr = booked - .read(format!("generate_sync:{}", actor_id.as_simple())) - .await; + let (last_version, needs, partials, last_cleared_ts) = { + let bookedr = booked + .read(format!("generate_sync:{}", actor_id.as_simple())) + .await; + ( + bookedr.last(), + bookedr.needed().clone(), + bookedr.partials.clone(), + bookedr.last_cleared_ts(), + ) + }; - let last_version = match { bookedr.last() } { + let last_version = match last_version { None => continue, Some(v) => v, }; - let need: Vec<_> = bookedr.needed().iter().cloned().collect(); + let need: Vec<_> = needs.iter().cloned().collect(); if !need.is_empty() { state.need.insert(actor_id, need); } { - for (v, partial) in bookedr - .partials + for (v, partial) in partials .iter() // don't set partial if it is effectively complete .filter(|(_, partial)| !partial.is_complete()) @@ -333,7 +556,7 @@ pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncState } if actor_id == self_actor_id { - last_ts = bookedr.last_cleared_ts(); + last_ts = last_cleared_ts; } state.heads.insert(actor_id, last_version); @@ -391,10 +614,106 @@ impl SyncMessage { #[cfg(test)] mod tests { + use itertools::Itertools; use uuid::Uuid; use super::*; + #[test] + fn test_merge_need() { + let actor1 = ActorId(Uuid::new_v4()); + + let mut state = SyncStateV1::default(); + + let mut needs = HashMap::new(); + needs.insert( + actor1, + vec![SyncNeedV1::Full { + versions: Version(1)..=Version(50), + }], + ); + state.merge_needs(&needs); + assert_eq!(state.heads.get(&actor1).unwrap(), &Version(50)); + assert!(state.need.get(&actor1).is_none()); + assert!(state.partial_need.get(&actor1).is_none()); + + needs.get_mut(&actor1).unwrap().push(SyncNeedV1::Full { + versions: Version(70)..=Version(90), + }); + state.merge_needs(&needs); + assert_eq!(state.heads.get(&actor1).unwrap(), &Version(90)); + assert!(state + .need + .get(&actor1) + .unwrap() + .iter() + .contains(&(Version(51)..=Version(69)))); + assert!(state.partial_need.get(&actor1).is_none()); + + needs.get_mut(&actor1).unwrap().push(SyncNeedV1::Full { + versions: Version(60)..=Version(65), + }); + state.merge_needs(&needs); + assert_eq!(state.heads.get(&actor1).unwrap(), &Version(90)); + assert!(state + .need + .get(&actor1) + .unwrap() + .iter() + .contains(&(Version(51)..=Version(59)))); + assert!(state + .need + .get(&actor1) + .unwrap() + .iter() + .contains(&(Version(66)..=Version(69)))); + assert!(state.partial_need.get(&actor1).is_none()); + + needs.get_mut(&actor1).unwrap().push(SyncNeedV1::Partial { + version: Version(40), + seqs: vec![CrsqlSeq(22)..=CrsqlSeq(25)], + }); + state + .partial_need + .entry(actor1) + .or_default() + .entry(Version(40)) + .or_default() + .extend_from_slice(&vec![ + CrsqlSeq(1)..=CrsqlSeq(10), + CrsqlSeq(20)..=CrsqlSeq(25), + ]); + state.merge_needs(&needs); + assert!(state + .partial_need + .get(&actor1) + .unwrap() + .get(&Version(40)) + .unwrap() + .contains(&(CrsqlSeq(1)..=CrsqlSeq(10)))); + assert!(state + .partial_need + .get(&actor1) + .unwrap() + .get(&Version(40)) + .unwrap() + .contains(&(CrsqlSeq(20)..=CrsqlSeq(21)))); + + let mut needs= HashMap::new(); + needs.insert(actor1, vec![SyncNeedV1::Partial { + version: Version(40), + seqs: vec![CrsqlSeq(1)..=CrsqlSeq(10)], + }, SyncNeedV1::Partial { + version: Version(40), + seqs: vec![CrsqlSeq(20)..=CrsqlSeq(21)], + }]); + state.merge_needs(&needs); + assert!(state + .partial_need + .get(&actor1) + .unwrap().is_empty()); + } + #[test] fn test_compute_available_needs() { let actor1 = ActorId(Uuid::new_v4()); From 74621b7ceddccbab184d09d340381eb11c990305 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Mon, 26 Aug 2024 18:07:41 +0100 Subject: [PATCH 02/10] remove get_needs test --- crates/corro-agent/src/api/peer.rs | 46 ------------------------------ crates/corro-types/src/sync.rs | 2 +- 2 files changed, 1 insertion(+), 47 deletions(-) diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 85c60520..caef070c 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1742,52 +1742,6 @@ mod tests { api::public::api_v1_db_schema, }; - use super::*; - #[test] - fn test_get_needs() -> eyre::Result<()> { - let original_state: SyncStateV1 = SyncStateV1::default(); - - let actor1 = ActorId(Uuid::new_v4()); - let actor2 = ActorId(Uuid::new_v4()); - let mut actor1_state = SyncStateV1::default(); - actor1_state.heads.insert(actor1, Version(60)); - actor1_state.heads.insert(actor2, Version(20)); - - let mut actor2_state = SyncStateV1::default(); - actor2_state.heads.insert(actor1, Version(60)); - actor2_state.heads.insert(actor2, Version(20)); - - let needs_map = distribute_available_needs( - original_state.clone(), - vec![ - (actor1, actor1_state.clone()), - (actor2, actor2_state.clone()), - ], - ); - println!("{:#?}", needs_map); - - let actor3 = ActorId(Uuid::new_v4()); - let mut actor3_state = SyncStateV1::default(); - actor3_state.heads.insert(actor3, Version(40)); - // actor 2 has seen only till Version(20) - actor2_state.heads.insert(actor3, Version(10)); - // actor 1 has seen up to 40 but has some needs - actor1_state.heads.insert(actor3, Version(40)); - actor1_state - .need - .insert(actor3, vec![(Version(3)..=Version(20))]); - - let needs_map = distribute_available_needs( - original_state, - vec![ - (actor1, actor1_state), - (actor2, actor2_state), - (actor3, actor3_state), - ], - ); - println!("{:#?}", needs_map); - Ok(()) - } #[tokio::test(flavor = "multi_thread")] async fn test_handle_need() -> eyre::Result<()> { diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 72dda7ee..dde72638 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -113,7 +113,7 @@ impl SyncStateV1 { for seq in seqs { seqs_set.remove(seq.clone()) } - *need_seqs = Vec::from_iter(seqs_set.clone().into_iter()); + *need_seqs = Vec::from_iter(seqs_set.into_iter()); if need_seqs.is_empty() { delete = true } From b26bbf18fa31f32227b271aa8e2bb6b104515101 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Mon, 26 Aug 2024 19:51:26 +0100 Subject: [PATCH 03/10] comment out test due to flakiness --- crates/corro-agent/src/api/peer.rs | 1 - crates/corro-types/src/sync.rs | 153 +++++++++++++++++++++++++---- 2 files changed, 132 insertions(+), 22 deletions(-) diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index caef070c..e23a60af 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1742,7 +1742,6 @@ mod tests { api::public::api_v1_db_schema, }; - #[tokio::test(flavor = "multi_thread")] async fn test_handle_need() -> eyre::Result<()> { _ = tracing_subscriber::fmt::try_init(); diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index dde72638..100efcb0 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -232,7 +232,7 @@ impl SyncStateV1 { needs.entry(*actor_id).or_default().push(SyncNeedV1::Full { versions: *start..=new_end, }); - total += end.0 - start.0; + total += new_end.0 - start.0 + 1; if total >= n { return needs; } @@ -250,6 +250,10 @@ impl SyncStateV1 { version: *v, seqs: seqs.clone(), }); + total += 1; + if total >= n { + return needs; + } } else if let Some(other_seqs) = other .partial_need .get(actor_id) @@ -314,11 +318,28 @@ impl SyncStateV1 { }; if let Some(missing) = missing { - total += missing.end().0 + missing.start().0; - needs - .entry(*actor_id) - .or_default() - .push(SyncNeedV1::Full { versions: missing }); + let mut missing = RangeInclusiveSet::from_iter([missing].into_iter()); + if let Some(other_needs) = other.need.get(actor_id) { + // remove needs + for need in other_needs.iter() { + missing.remove(need.clone()); + } + } + + // remove partial needs + if let Some(partials) = other.partial_need.get(actor_id) { + for (v, _) in partials { + missing.remove(*v..=*v); + } + } + + missing.into_iter().for_each(|v| { + total += v.end().0 - v.start().0 + 1; + needs + .entry(*actor_id) + .or_default() + .push(SyncNeedV1::Full { versions: v }); + }); } if total >= n { @@ -365,7 +386,6 @@ impl SyncStateV1 { haves }; - println!("actor - {actor_id}, haves - {other_haves:?}"); if let Some(our_need) = self.need.get(actor_id) { for range in our_need.iter() { for overlap in other_haves.overlapping(range) { @@ -447,11 +467,17 @@ impl SyncStateV1 { if let Some(other_needs) = other.need.get(actor_id) { // remove needs for need in other_needs.iter() { - // create gaps missing.remove(need.clone()); } } + // remove partial needs + if let Some(partials) = other.partial_need.get(actor_id) { + for (v, _) in partials { + missing.remove(*v..=*v); + } + } + missing.into_iter().for_each(|v| { needs .entry(*actor_id) @@ -495,7 +521,6 @@ impl From for SyncMessage { } } - // generates a `SyncMessage` to tell another node what versions we're missing #[tracing::instrument(skip_all, level = "debug")] pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncStateV1 { @@ -619,6 +644,89 @@ mod tests { use super::*; + // TODO: this test occasionally fails because get_n_needs loops over a HashMap which is + // unordered. This could probably be fixed by using a BTreeMap instead + // #[test] + // fn test_get_n_needs() -> eyre::Result<()> { + // let mut original_state: SyncStateV1 = SyncStateV1::default(); + // + // // static strings so the order is predictible + // let actor1 = ActorId(Uuid::parse_str("adea794c-8bb8-4ca6-b04b-87ec22348326").unwrap()); + // let actor2 = ActorId(Uuid::parse_str("0dea794c-8bb8-4ca6-b04b-87ec22348326").unwrap()); + // + // let mut actor1_state = SyncStateV1::default(); + // actor1_state.heads.insert(actor1, Version(20)); + // let expected = HashMap::from([( + // actor1, + // vec![SyncNeedV1::Full { + // versions: Version(1)..=Version(10), + // }], + // )]); + // assert_eq!(original_state.get_n_needs(&actor1_state, 10), expected); + // + // let mut actor1_state = SyncStateV1::default(); + // actor1_state.heads.insert(actor1, Version(8)); + // actor1_state.heads.insert(actor2, Version(20)); + // let got = original_state.get_n_needs(&actor1_state, 10); + // let expected = HashMap::from([ + // ( + // actor1, + // vec![SyncNeedV1::Full { + // versions: Version(1)..=Version(8), + // }], + // ), + // ( + // actor2, + // vec![SyncNeedV1::Full { + // versions: Version(1)..=Version(2), + // }], + // ), + // ]); + // assert_eq!(got, expected); + // + // let mut actor1_state = SyncStateV1::default(); + // actor1_state.heads.insert(actor1, Version(30)); + // actor1_state.partial_need.insert( + // actor1, + // HashMap::from([(Version(13), vec![CrsqlSeq(20)..=CrsqlSeq(25)])]), + // ); + // actor1_state + // .need + // .insert(actor1, vec![Version(21)..=Version(24)]); + // + // original_state.heads.insert(actor1, Version(10)); + // original_state + // .need + // .insert(actor1, vec![Version(4)..=Version(5)]); + // original_state.partial_need.insert( + // actor1, + // HashMap::from([(Version(9), vec![CrsqlSeq(1)..=CrsqlSeq(10)])]), + // ); + // + // let got = original_state.get_n_needs(&actor1_state, 10); + // let expected = HashMap::from([( + // actor1, + // vec![ + // SyncNeedV1::Full { + // versions: Version(4)..=Version(5), + // }, + // SyncNeedV1::Partial { + // version: Version(9), + // seqs: vec![CrsqlSeq(1)..=CrsqlSeq(10)], + // }, + // SyncNeedV1::Full { + // versions: Version(11)..=Version(12), + // }, + // SyncNeedV1::Full { + // versions: Version(14)..=Version(17), + // }, + // ], + // )]); + // assert_eq!(got, expected); + // + // Ok(()) + // } + #[test] fn test_merge_need() { let actor1 = ActorId(Uuid::new_v4()); @@ -699,19 +807,22 @@ mod tests { .unwrap() .contains(&(CrsqlSeq(20)..=CrsqlSeq(21)))); - let mut needs= HashMap::new(); - needs.insert(actor1, vec![SyncNeedV1::Partial { - version: Version(40), - seqs: vec![CrsqlSeq(1)..=CrsqlSeq(10)], - }, SyncNeedV1::Partial { - version: Version(40), - seqs: vec![CrsqlSeq(20)..=CrsqlSeq(21)], - }]); + let mut needs = HashMap::new(); + needs.insert( + actor1, + vec![ + SyncNeedV1::Partial { + version: Version(40), + seqs: vec![CrsqlSeq(1)..=CrsqlSeq(10)], + }, + SyncNeedV1::Partial { + version: Version(40), + seqs: vec![CrsqlSeq(20)..=CrsqlSeq(21)], + }, + ], + ); state.merge_needs(&needs); - assert!(state - .partial_need - .get(&actor1) - .unwrap().is_empty()); + assert!(state.partial_need.get(&actor1).unwrap().is_empty()); } #[test] From 92e27b1a3928d707167d388394d2b38bba9b1974 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Mon, 26 Aug 2024 20:03:20 +0100 Subject: [PATCH 04/10] add back import --- crates/corro-agent/src/api/peer.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index e23a60af..5d915eb0 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1735,13 +1735,15 @@ mod tests { use rand::{Rng, RngCore}; use tempfile::TempDir; use tripwire::Tripwire; - use uuid::Uuid; use crate::{ agent::{process_multiple_changes, setup}, api::public::api_v1_db_schema, }; + use super::*; + + #[tokio::test(flavor = "multi_thread")] async fn test_handle_need() -> eyre::Result<()> { _ = tracing_subscriber::fmt::try_init(); From 9d6ac2a773e367a70cccd1f022e412b387ebcec0 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Tue, 27 Aug 2024 15:20:56 +0100 Subject: [PATCH 05/10] use BTreeMap and enable test --- crates/corro-types/src/sync.rs | 212 +++++++++++++++++---------------- 1 file changed, 111 insertions(+), 101 deletions(-) diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 100efcb0..6f220bd2 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::{cmp, collections::HashMap, io, ops::RangeInclusive}; use bytes::BytesMut; @@ -79,9 +80,9 @@ pub enum SyncRejectionV1 { #[derive(Debug, Default, Clone, PartialEq, Readable, Writable, Serialize, Deserialize)] pub struct SyncStateV1 { pub actor_id: ActorId, - pub heads: HashMap, - pub need: HashMap>>, - pub partial_need: HashMap>>>, + pub heads: BTreeMap, + pub need: BTreeMap>>, + pub partial_need: BTreeMap>>>, #[speedy(default_on_eof)] pub last_cleared_ts: Option, } @@ -227,7 +228,7 @@ impl SyncStateV1 { for overlap in other_haves.overlapping(range) { let start = cmp::max(range.start(), overlap.start()); let end = cmp::min(range.end(), overlap.end()); - let left = n - total; + let left = n - total - 1; let new_end = cmp::min(*end, *start + left); needs.entry(*actor_id).or_default().push(SyncNeedV1::Full { versions: *start..=new_end, @@ -301,20 +302,15 @@ impl SyncStateV1 { } } - let left = n - total; let missing = match self.heads.get(actor_id) { Some(our_head) => { if head > our_head { - let new_head = cmp::min(*our_head + left, *head); - Some((*our_head + 1)..=new_head) + Some((*our_head + 1)..=*head) } else { None } } - None => { - let new_head = Version(cmp::min(head.0, left)); - Some(Version(1)..=new_head) - } + None => Some(Version(1)..=*head), }; if let Some(missing) = missing { @@ -333,17 +329,18 @@ impl SyncStateV1 { } } - missing.into_iter().for_each(|v| { - total += v.end().0 - v.start().0 + 1; - needs - .entry(*actor_id) - .or_default() - .push(SyncNeedV1::Full { versions: v }); - }); - } + for v in missing { + let left = n - total - 1; + let new_end = cmp::min(*v.start() + left, *v.end()); + needs.entry(*actor_id).or_default().push(SyncNeedV1::Full { + versions: *v.start()..=new_end, + }); - if total >= n { - return needs; + total += new_end.0 - v.start().0 + 1; + if total >= n { + return needs; + } + } } } @@ -646,86 +643,99 @@ mod tests { // TODO: this test occasionally fails because get_n_needs loops over a HashMap which is // unordered. This could probably be fixed by using a BTreeMap instead - // #[test] - // fn test_get_n_needs() -> eyre::Result<()> { - // let mut original_state: SyncStateV1 = SyncStateV1::default(); - // - // // static strings so the order is predictible - // let actor1 = ActorId(Uuid::parse_str("adea794c-8bb8-4ca6-b04b-87ec22348326").unwrap()); - // let actor2 = ActorId(Uuid::parse_str("0dea794c-8bb8-4ca6-b04b-87ec22348326").unwrap()); - // - // let mut actor1_state = SyncStateV1::default(); - // actor1_state.heads.insert(actor1, Version(20)); - // let expected = HashMap::from([( - // actor1, - // vec![SyncNeedV1::Full { - // versions: Version(1)..=Version(10), - // }], - // )]); - // assert_eq!(original_state.get_n_needs(&actor1_state, 10), expected); - // - // let mut actor1_state = SyncStateV1::default(); - // actor1_state.heads.insert(actor1, Version(8)); - // actor1_state.heads.insert(actor2, Version(20)); - // let got = original_state.get_n_needs(&actor1_state, 10); - // let expected = HashMap::from([ - // ( - // actor1, - // vec![SyncNeedV1::Full { - // versions: Version(1)..=Version(8), - // }], - // ), - // ( - // actor2, - // vec![SyncNeedV1::Full { - // versions: Version(1)..=Version(2), - // }], - // ), - // ]); - // assert_eq!(got, expected); - // - // let mut actor1_state = SyncStateV1::default(); - // actor1_state.heads.insert(actor1, Version(30)); - // actor1_state.partial_need.insert( - // actor1, - // HashMap::from([(Version(13), vec![CrsqlSeq(20)..=CrsqlSeq(25)])]), - // ); - // actor1_state - // .need - // .insert(actor1, vec![Version(21)..=Version(24)]); - // - // original_state.heads.insert(actor1, Version(10)); - // original_state - // .need - // .insert(actor1, vec![Version(4)..=Version(5)]); - // original_state.partial_need.insert( - // actor1, - // HashMap::from([(Version(9), vec![CrsqlSeq(1)..=CrsqlSeq(10)])]), - // ); - // - // let got = original_state.get_n_needs(&actor1_state, 10); - // let expected = HashMap::from([( - // actor1, - // vec![ - // SyncNeedV1::Full { - // versions: Version(4)..=Version(5), - // }, - // SyncNeedV1::Partial { - // version: Version(9), - // seqs: vec![CrsqlSeq(1)..=CrsqlSeq(10)], - // }, - // SyncNeedV1::Full { - // versions: Version(11)..=Version(12), - // }, - // SyncNeedV1::Full { - // versions: Version(14)..=Version(17), - // }, - // ], - // )]); - // assert_eq!(got, expected); - // - // Ok(()) - // } + #[test] + fn test_get_n_needs() { + let mut original_state: SyncStateV1 = SyncStateV1::default(); + + // static strings so the order is predictible + let actor1 = ActorId(Uuid::parse_str("0dea794c-8bb8-4ca6-b04b-87ec22348326").unwrap()); + let actor2 = ActorId(Uuid::parse_str("adea794c-8bb8-4ca6-b04b-87ec22348326").unwrap()); + + let mut actor1_state = SyncStateV1::default(); + actor1_state.heads.insert(actor1, Version(20)); + let expected = HashMap::from([( + actor1, + vec![SyncNeedV1::Full { + versions: Version(1)..=Version(10), + }], + )]); + assert_eq!(original_state.get_n_needs(&actor1_state, 10), expected); + + let mut actor1_state = SyncStateV1::default(); + actor1_state.heads.insert(actor1, Version(8)); + actor1_state.heads.insert(actor2, Version(20)); + let got = original_state.get_n_needs(&actor1_state, 10); + let expected = HashMap::from([ + ( + actor1, + vec![SyncNeedV1::Full { + versions: Version(1)..=Version(8), + }], + ), + ( + actor2, + vec![SyncNeedV1::Full { + versions: Version(1)..=Version(2), + }], + ), + ]); + assert_eq!(got, expected); + + let mut actor1_state = SyncStateV1::default(); + actor1_state.heads.insert(actor1, Version(30)); + actor1_state.partial_need.insert( + actor1, + HashMap::from([(Version(13), vec![CrsqlSeq(20)..=CrsqlSeq(25)])]), + ); + actor1_state + .need + .insert(actor1, vec![Version(21)..=Version(24)]); + + original_state.heads.insert(actor1, Version(10)); + original_state + .need + .insert(actor1, vec![Version(4)..=Version(5)]); + original_state.partial_need.insert( + actor1, + HashMap::from([(Version(9), vec![CrsqlSeq(1)..=CrsqlSeq(10)])]), + ); + + let got = original_state.get_n_needs(&actor1_state, 10); + let expected = HashMap::from([( + actor1, + vec![ + SyncNeedV1::Full { + versions: Version(4)..=Version(5), + }, + SyncNeedV1::Partial { + version: Version(9), + seqs: vec![CrsqlSeq(1)..=CrsqlSeq(10)], + }, + SyncNeedV1::Full { + versions: Version(11)..=Version(12), + }, + SyncNeedV1::Full { + versions: Version(14)..=Version(18), + }, + ], + )]); + assert_eq!(got, expected); + + let mut actor1_state = SyncStateV1::default(); + actor1_state.heads.insert(actor1, Version(30)); + original_state.heads.insert(actor1, Version(30)); + original_state + .need + .insert(actor1, vec![Version(4)..=Version(20)]); + let got = original_state.get_n_needs(&actor1_state, 10); + let expected = HashMap::from([( + actor1, + vec![SyncNeedV1::Full { + versions: Version(4)..=Version(13), + }], + )]); + assert_eq!(got, expected); + } #[test] fn test_merge_need() { From 7716b647d0a7330fa0c28ed65e97f94b691e4dff Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Tue, 27 Aug 2024 16:59:08 +0100 Subject: [PATCH 06/10] add log about how long calculating needs took --- crates/corro-agent/src/api/peer.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 5d915eb0..3949c15b 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1153,7 +1153,9 @@ pub async fn parallel_sync( let len = syncers.len(); let actor_state: Vec<_> = syncers.iter().map(|x| (x.0, x.2.clone())).collect(); + let compute_start = Instant::now(); let actor_needs = distribute_available_needs(our_sync_state, actor_state); + info!("took {:?} to compute needs from other actors", compute_start.elapsed()); let (readers, mut servers) = { let mut rng = rand::thread_rng(); From c44424c8b047b40ca070935853bd9171e7b4c7cb Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Tue, 27 Aug 2024 20:35:15 +0100 Subject: [PATCH 07/10] try alt func --- crates/corro-agent/src/api/peer.rs | 100 ++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 3949c15b..ab4631b3 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -942,6 +942,16 @@ async fn process_sync( Ok(()) } +fn chunk_range + std::cmp::Ord + Copy>( + range: RangeInclusive, + chunk_size: usize, +) -> impl Iterator> { + range.clone().step_by(chunk_size).map(move |block_start| { + let block_end = (block_start + chunk_size as u64).min(*range.end()); + block_start..=block_end + }) +} + fn encode_sync_msg( codec: &mut LengthDelimitedCodec, encode_buf: &mut BytesMut, @@ -1154,7 +1164,7 @@ pub async fn parallel_sync( let len = syncers.len(); let actor_state: Vec<_> = syncers.iter().map(|x| (x.0, x.2.clone())).collect(); let compute_start = Instant::now(); - let actor_needs = distribute_available_needs(our_sync_state, actor_state); + let actor_needs = distribute_available_needs2(our_sync_state, actor_state); info!("took {:?} to compute needs from other actors", compute_start.elapsed()); let (readers, mut servers) = { @@ -1471,6 +1481,94 @@ pub fn distribute_available_needs( final_needs } +pub fn distribute_available_needs2( + our_state: SyncStateV1, + states: Vec<(ActorId, SyncStateV1)>, +) -> HashMap> { + let mut final_needs: HashMap> = HashMap::new(); + let states_map: HashMap<_, _> = states.clone().into_iter().collect(); + let actors: Vec<_> = states.iter().map(|(id, _)| *id).collect(); + let mut our_state = our_state; + for (actor_id, state) in states { + let actor_needs = our_state.compute_available_needs(&state); + for (needs_actor, needs) in actor_needs.clone() { + for need in needs { + match need { + SyncNeedV1::Full { versions } => { + let chunks: Vec<_> = crate::api::peer::chunk_range(versions, 10).collect(); + let mut actors: Vec<_> = actors.clone(); + + for i in 0..chunks.len() { + let rest: Vec<(_, _)> = chunks[i..].iter().map(|c| ( + needs_actor, + SyncNeedV1::Full { + versions: c.clone(), + }) + ).collect(); + if actors.len() <= 1 { + final_needs.entry(actor_id).or_default().extend_from_slice(&rest); + break + } + + let mut remove_actor = vec![]; + let version_actors: Vec<_> = actors + .clone() + .into_iter() + .filter(|x| { + let state = states_map + .get(x) + .cloned() + .unwrap_or_default(); + + if state.heads.get(&needs_actor).unwrap_or(&Default::default()) + < chunks[i].start() { + remove_actor.push(*x); + } + + state.contains(&needs_actor, &chunks[i]) + }) + .collect(); + + let least = get_actor_min(&version_actors, &final_needs); + let least = least.unwrap_or(actor_id); + + final_needs.entry(least).or_default().push(( + needs_actor, + SyncNeedV1::Full { + versions: chunks[i].clone(), + }, + )); + println!("actor {least:?} - adding need {:?}", chunks[i]); + actors.retain(|x| !remove_actor.contains(x)); + } + } + _ => { + final_needs + .entry(actor_id) + .or_default() + .push((needs_actor, need)); + } + } + } + } + our_state.merge_needs(&actor_needs); + } + + final_needs +} + +pub fn get_actor_min( + actors: &Vec, + cur_needs: &HashMap>, +) -> Option { + let min = actors + .iter() + .min_by_key(|x| cur_needs.get(x).unwrap_or(&vec![]).len()); + + min.cloned() +} + + #[tracing::instrument(skip(agent, bookie, their_actor_id, read, write), fields(actor_id = %their_actor_id), err)] pub async fn serve_sync( agent: &Agent, From 634699d310b698b83aeb0fbe046c983ed4ac6111 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Tue, 27 Aug 2024 20:51:21 +0100 Subject: [PATCH 08/10] remove spamming log --- crates/corro-agent/src/api/peer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index ab4631b3..31feb7ec 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1538,7 +1538,6 @@ pub fn distribute_available_needs2( versions: chunks[i].clone(), }, )); - println!("actor {least:?} - adding need {:?}", chunks[i]); actors.retain(|x| !remove_actor.contains(x)); } } From 65cc804688b72cd71a71748148efe8413b3a29dd Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Wed, 28 Aug 2024 11:16:24 +0100 Subject: [PATCH 09/10] get needs before sending to spread out computation --- crates/corro-agent/src/api/peer.rs | 256 +++++------------------------ 1 file changed, 37 insertions(+), 219 deletions(-) diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 31feb7ec..d603a2a1 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1,5 +1,5 @@ use std::cmp; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap}; use std::net::SocketAddr; use std::ops::RangeInclusive; use std::sync::Arc; @@ -23,7 +23,6 @@ use futures::{Future, Stream, TryFutureExt, TryStreamExt}; use itertools::Itertools; use metrics::counter; use quinn::{RecvStream, SendStream}; -use rand::seq::SliceRandom; use rangemap::{RangeInclusiveMap, RangeInclusiveSet}; use rusqlite::{named_params, Connection}; use speedy::Writable; @@ -942,16 +941,6 @@ async fn process_sync( Ok(()) } -fn chunk_range + std::cmp::Ord + Copy>( - range: RangeInclusive, - chunk_size: usize, -) -> impl Iterator> { - range.clone().step_by(chunk_size).map(move |block_start| { - let block_end = (block_start + chunk_size as u64).min(*range.end()); - block_start..=block_end - }) -} - fn encode_sync_msg( codec: &mut LengthDelimitedCodec, encode_buf: &mut BytesMut, @@ -1038,7 +1027,7 @@ pub async fn parallel_sync( agent: &Agent, transport: &Transport, members: Vec<(ActorId, SocketAddr)>, - our_sync_state: SyncStateV1, + mut our_sync_state: SyncStateV1, our_empty_ts: HashMap>, ) -> Result { trace!( @@ -1162,51 +1151,17 @@ pub async fn parallel_sync( })?; let len = syncers.len(); - let actor_state: Vec<_> = syncers.iter().map(|x| (x.0, x.2.clone())).collect(); - let compute_start = Instant::now(); - let actor_needs = distribute_available_needs2(our_sync_state, actor_state); - info!("took {:?} to compute needs from other actors", compute_start.elapsed()); let (readers, mut servers) = { - let mut rng = rand::thread_rng(); syncers.into_iter().fold( (Vec::with_capacity(len), Vec::with_capacity(len)), |(mut readers, mut servers), (actor_id, addr, state, tx, read)| { - let mut needs = actor_needs.get(&actor_id).cloned().unwrap_or_default(); - - let cleared_ts = state.last_cleared_ts; - if let Some(ts) = cleared_ts { - if let Some(last_seen) = our_empty_ts.get(&actor_id) { - if last_seen.is_none() || last_seen.unwrap() < ts { - debug!(%actor_id, "got last cleared ts {cleared_ts:?} - out last_seen {last_seen:?}"); - needs.push((actor_id, SyncNeedV1::Empty { ts: *last_seen })); - } - } - } - - if needs.is_empty() { - trace!(%actor_id, "no needs!"); - return (readers, servers); - } readers.push((actor_id, read)); - trace!(%actor_id, "needs: {needs:?}"); - - debug!(%actor_id, %addr, "needs len: {}", needs.iter().map(|(_, need)| match need { - SyncNeedV1::Full {versions} => (versions.end().0 - versions.start().0) as usize + 1, - SyncNeedV1::Partial {..} => 0, - SyncNeedV1::Empty {..} => 0, - }).sum::()); - - needs.shuffle(&mut rng); - - - let actor_needs = needs.into_iter().collect::>(); - servers.push(( actor_id, addr, - actor_needs, + state, tx, )); @@ -1225,12 +1180,6 @@ pub async fn parallel_sync( let mut send_buf = BytesMut::new(); let mut encode_buf = BytesMut::new(); - // already requested full versions - let mut req_full: HashMap> = HashMap::new(); - - // already requested partial version sequences - let mut req_partials: HashMap<(ActorId, Version), RangeInclusiveSet> = HashMap::new(); - let start = Instant::now(); loop { @@ -1238,90 +1187,27 @@ pub async fn parallel_sync( break; } let mut next_servers = Vec::with_capacity(servers.len()); - 'servers: for (server_actor_id, addr, mut needs, mut tx) in servers { - if needs.is_empty() { - continue; - } - - let mut drained = 0; - - while drained < 10 { - let (actor_id, need) = match needs.pop_front() { - Some(popped) => popped, - None => { - break; - } - }; - - drained += 1; - - let actual_needs = match need { - SyncNeedV1::Full { versions } => { - let range = req_full.entry(actor_id).or_default(); - - let mut new_versions = - RangeInclusiveSet::from_iter([versions.clone()].into_iter()); - - // check if we've already requested - for overlap in range.overlapping(&versions) { - new_versions.remove(overlap.clone()); - } - - if new_versions.is_empty() { - continue; - } - - new_versions - .into_iter() - .map(|versions| { - range.remove(versions.clone()); - SyncNeedV1::Full { versions } - }) - .collect() - } - SyncNeedV1::Partial { version, seqs } => { - let range = req_partials.entry((actor_id, version)).or_default(); - let mut new_seqs = - RangeInclusiveSet::from_iter(seqs.clone().into_iter()); - - for seqs in seqs { - for overlap in range.overlapping(&seqs) { - new_seqs.remove(overlap.clone()); - } - } - - if new_seqs.is_empty() { - continue; - } - - vec![SyncNeedV1::Partial { - version, - seqs: new_seqs - .into_iter() - .map(|seqs| { - range.remove(seqs.clone()); - seqs - }) - .collect(), - }] - } - need => {vec![need]}, - }; - - if actual_needs.is_empty() { - warn!(%server_actor_id, %actor_id, %addr, "nothing to send!"); - continue; + 'servers: for (server_actor_id, addr, state, mut tx) in servers { + let mut last_need: HashMap> = HashMap::new(); + for _ in 0..10 { + let needs = our_sync_state.get_n_needs(&state, 10); + last_need = needs.clone(); + if needs.is_empty() { + break; } - let req_len = actual_needs.len(); + our_sync_state.merge_needs(&needs); + + let needs = Vec::from_iter(needs.into_iter()); + let req_len = needs.len(); if let Err(e) = encode_sync_msg( &mut codec, &mut encode_buf, &mut send_buf, - SyncMessage::V1(SyncMessageV1::Request(vec![(actor_id, actual_needs)])), + SyncMessage::V1(SyncMessageV1::Request(needs)), ) { - error!(%server_actor_id, %actor_id, %addr, "could not encode sync request: {e} (elapsed: {:?})", start.elapsed()); + error!(%server_actor_id, %server_actor_id, %addr, "could not encode sync request: {e} (elapsed: {:?})", start.elapsed()); continue 'servers; } @@ -1338,7 +1224,26 @@ pub async fn parallel_sync( tokio::task::yield_now().await; } - if needs.is_empty() { + if last_need.is_empty() { + if let Some(ts) = state.last_cleared_ts { + if let Some(last_seen) = our_empty_ts.get(&server_actor_id) { + if last_seen.is_none() || last_seen.unwrap() < ts { + if let Err(e) = encode_sync_msg( + &mut codec, + &mut encode_buf, + &mut send_buf, + SyncMessage::V1(SyncMessageV1::Request(vec![(server_actor_id, vec![SyncNeedV1::Empty { ts: *last_seen }])])), + ) { + error!(%server_actor_id, %server_actor_id, %addr, "could not encode sync request: {e} (elapsed: {:?})", start.elapsed()); + } else { + if let Err(e) = write_buf(&mut send_buf, &mut tx).await { + error!(%server_actor_id, %addr, "could not write sync requests: {e} (elapsed: {:?})", start.elapsed()); + } + } + } + } + } + if let Err(e) = tx.finish().instrument(info_span!("quic_finish")).await { warn!("could not finish stream while sending sync requests: {e}"); } @@ -1346,7 +1251,7 @@ pub async fn parallel_sync( continue; } - next_servers.push((server_actor_id, addr, needs, tx)); + next_servers.push((server_actor_id, addr, state, tx)); } servers = next_servers; } @@ -1481,93 +1386,6 @@ pub fn distribute_available_needs( final_needs } -pub fn distribute_available_needs2( - our_state: SyncStateV1, - states: Vec<(ActorId, SyncStateV1)>, -) -> HashMap> { - let mut final_needs: HashMap> = HashMap::new(); - let states_map: HashMap<_, _> = states.clone().into_iter().collect(); - let actors: Vec<_> = states.iter().map(|(id, _)| *id).collect(); - let mut our_state = our_state; - for (actor_id, state) in states { - let actor_needs = our_state.compute_available_needs(&state); - for (needs_actor, needs) in actor_needs.clone() { - for need in needs { - match need { - SyncNeedV1::Full { versions } => { - let chunks: Vec<_> = crate::api::peer::chunk_range(versions, 10).collect(); - let mut actors: Vec<_> = actors.clone(); - - for i in 0..chunks.len() { - let rest: Vec<(_, _)> = chunks[i..].iter().map(|c| ( - needs_actor, - SyncNeedV1::Full { - versions: c.clone(), - }) - ).collect(); - if actors.len() <= 1 { - final_needs.entry(actor_id).or_default().extend_from_slice(&rest); - break - } - - let mut remove_actor = vec![]; - let version_actors: Vec<_> = actors - .clone() - .into_iter() - .filter(|x| { - let state = states_map - .get(x) - .cloned() - .unwrap_or_default(); - - if state.heads.get(&needs_actor).unwrap_or(&Default::default()) - < chunks[i].start() { - remove_actor.push(*x); - } - - state.contains(&needs_actor, &chunks[i]) - }) - .collect(); - - let least = get_actor_min(&version_actors, &final_needs); - let least = least.unwrap_or(actor_id); - - final_needs.entry(least).or_default().push(( - needs_actor, - SyncNeedV1::Full { - versions: chunks[i].clone(), - }, - )); - actors.retain(|x| !remove_actor.contains(x)); - } - } - _ => { - final_needs - .entry(actor_id) - .or_default() - .push((needs_actor, need)); - } - } - } - } - our_state.merge_needs(&actor_needs); - } - - final_needs -} - -pub fn get_actor_min( - actors: &Vec, - cur_needs: &HashMap>, -) -> Option { - let min = actors - .iter() - .min_by_key(|x| cur_needs.get(x).unwrap_or(&vec![]).len()); - - min.cloned() -} - - #[tracing::instrument(skip(agent, bookie, their_actor_id, read, write), fields(actor_id = %their_actor_id), err)] pub async fn serve_sync( agent: &Agent, From 74e543501cde7d49a380fb8fb358f3f62f4bc9cc Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Wed, 28 Aug 2024 13:08:39 +0100 Subject: [PATCH 10/10] increase queue len --- crates/corro-types/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/corro-types/src/config.rs b/crates/corro-types/src/config.rs index 7681bfb9..cf82894d 100644 --- a/crates/corro-types/src/config.rs +++ b/crates/corro-types/src/config.rs @@ -15,7 +15,7 @@ const fn default_wal_threshold() -> usize { 10 } const fn default_processing_queue() -> usize { - 100000 + 500000 } /// Used for the apply channel