Skip to content

Commit

Permalink
fix some functions
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Aug 25, 2024
1 parent c5eb703 commit c2d17ee
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
14 changes: 6 additions & 8 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ pub async fn parallel_sync(

let len = syncers.len();
let actor_state: Vec<_> = syncers.iter().map(|x| (x.0, x.2.clone())).collect();
let actor_needs = get_needs(our_sync_state, actor_state);
let actor_needs = distribute_available_needs(our_sync_state, actor_state);

let (readers, mut servers) = {
let mut rng = rand::thread_rng();
Expand Down Expand Up @@ -1428,14 +1428,12 @@ pub async fn parallel_sync(
.sum::<usize>())
}

pub fn get_needs(
our_state: SyncStateV1,
states: Vec<(ActorId, SyncStateV1)>,
pub fn distribute_available_needs(
mut our_state: SyncStateV1,
mut states: Vec<(ActorId, SyncStateV1)>,
) -> HashMap<ActorId, Vec<(ActorId, SyncNeedV1)>> {
let mut final_needs: HashMap<ActorId, Vec<(ActorId, SyncNeedV1)>> = HashMap::new();

let mut our_state = our_state;
let mut states = states;
while !states.is_empty() {
let mut remove_keys = vec![];
for (actor_id, state) in &states {
Expand Down Expand Up @@ -1752,7 +1750,7 @@ mod tests {
actor2_state.heads.insert(actor1, Version(60));
actor2_state.heads.insert(actor2, Version(20));

let needs_map = get_needs(
let needs_map = distribute_available_needs(
original_state.clone(),
vec![
(actor1, actor1_state.clone()),
Expand All @@ -1772,7 +1770,7 @@ mod tests {
.need
.insert(actor3, vec![(Version(3)..=Version(20))]);

let needs_map = get_needs(
let needs_map = distribute_available_needs(
original_state,
vec![
(actor1, actor1_state),
Expand Down
15 changes: 14 additions & 1 deletion crates/corro-types/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,6 @@ mod tests {
CrsqlSeq(20)..=CrsqlSeq(25),
]);
state.merge_needs(&needs);
println!("state: {state:?}");
assert!(state
.partial_need
.get(&actor1)
Expand All @@ -711,6 +710,20 @@ mod tests {
.get(&Version(40))
.unwrap()
.contains(&(CrsqlSeq(20)..=CrsqlSeq(21))));

let mut needs= HashMap::new();
needs.insert(actor1, vec![SyncNeedV1::Partial {
version: Version(40),
seqs: vec![CrsqlSeq(1)..=CrsqlSeq(10)],
}, SyncNeedV1::Partial {
version: Version(40),
seqs: vec![CrsqlSeq(20)..=CrsqlSeq(21)],
}]);
state.merge_needs(&needs);
assert!(state
.partial_need
.get(&actor1)
.unwrap().is_empty());
}

#[test]
Expand Down

0 comments on commit c2d17ee

Please sign in to comment.