diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 5c590e08..c13d3ff3 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1153,7 +1153,7 @@ pub async fn parallel_sync( let len = syncers.len(); let actor_state: Vec<_> = syncers.iter().map(|x| (x.0, x.2.clone())).collect(); - let actor_needs = get_needs(our_sync_state, actor_state); + let actor_needs = distribute_available_needs(our_sync_state, actor_state); let (readers, mut servers) = { let mut rng = rand::thread_rng(); @@ -1428,14 +1428,12 @@ pub async fn parallel_sync( .sum::()) } -pub fn get_needs( - our_state: SyncStateV1, - states: Vec<(ActorId, SyncStateV1)>, +pub fn distribute_available_needs( + mut our_state: SyncStateV1, + mut states: Vec<(ActorId, SyncStateV1)>, ) -> HashMap> { let mut final_needs: HashMap> = HashMap::new(); - let mut our_state = our_state; - let mut states = states; while !states.is_empty() { let mut remove_keys = vec![]; for (actor_id, state) in &states { @@ -1752,7 +1750,7 @@ mod tests { actor2_state.heads.insert(actor1, Version(60)); actor2_state.heads.insert(actor2, Version(20)); - let needs_map = get_needs( + let needs_map = distribute_available_needs( original_state.clone(), vec![ (actor1, actor1_state.clone()), @@ -1772,7 +1770,7 @@ mod tests { .need .insert(actor3, vec![(Version(3)..=Version(20))]); - let needs_map = get_needs( + let needs_map = distribute_available_needs( original_state, vec![ (actor1, actor1_state), diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 8465e908..7a63caed 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -696,7 +696,6 @@ mod tests { CrsqlSeq(20)..=CrsqlSeq(25), ]); state.merge_needs(&needs); - println!("state: {state:?}"); assert!(state .partial_need .get(&actor1) @@ -711,6 +710,20 @@ mod tests { .get(&Version(40)) .unwrap() .contains(&(CrsqlSeq(20)..=CrsqlSeq(21)))); + + let mut needs= HashMap::new(); + needs.insert(actor1, vec![SyncNeedV1::Partial { + version: Version(40), + seqs: vec![CrsqlSeq(1)..=CrsqlSeq(10)], + }, SyncNeedV1::Partial { + version: Version(40), + seqs: vec![CrsqlSeq(20)..=CrsqlSeq(21)], + }]); + state.merge_needs(&needs); + assert!(state + .partial_need + .get(&actor1) + .unwrap().is_empty()); } #[test]