Skip to content

Commit

Permalink
working function
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Aug 26, 2024
1 parent 8617910 commit be67362
Show file tree
Hide file tree
Showing 5 changed files with 452 additions and 76 deletions.
5 changes: 2 additions & 3 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,12 +988,11 @@ pub async fn process_multiple_changes(

if let Some(ts) = last_cleared {
let mut booked_writer = agent
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
booked_writer.update_cleared_ts(ts);
}


for (_, changeset, _, _) in changesets.iter() {
if let Some(ts) = changeset.ts() {
let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration();
Expand Down
157 changes: 103 additions & 54 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,16 +942,6 @@ async fn process_sync(
Ok(())
}

fn chunk_range<T: std::iter::Step + std::ops::Add<u64, Output = T> + std::cmp::Ord + Copy>(
range: RangeInclusive<T>,
chunk_size: usize,
) -> impl Iterator<Item = RangeInclusive<T>> {
range.clone().step_by(chunk_size).map(move |block_start| {
let block_end = (block_start + chunk_size as u64).min(*range.end());
block_start..=block_end
})
}

fn encode_sync_msg(
codec: &mut LengthDelimitedCodec,
encode_buf: &mut BytesMut,
Expand Down Expand Up @@ -1127,38 +1117,23 @@ pub async fn parallel_sync(

counter!("corro.sync.client.member", "id" => actor_id.to_string(), "addr" => addr.to_string()).increment(1);

let mut needs = our_sync_state.compute_available_needs(&their_sync_state);

trace!(%actor_id, self_actor_id = %agent.actor_id(), "computed needs");

let cleared_ts = their_sync_state.last_cleared_ts;

if let Some(ts) = cleared_ts {
if let Some(last_seen) = our_empty_ts.get(&actor_id) {
if last_seen.is_none() || last_seen.unwrap() < ts {
debug!(%actor_id, "got last cleared ts {cleared_ts:?} - out last_seen {last_seen:?}");
needs.entry(actor_id).or_default().push( SyncNeedV1::Empty { ts: *last_seen });
}
}
}

Ok::<_, SyncError>((needs, tx, read))
Ok::<_, SyncError>((their_sync_state, tx, read))
}.await
)
}.instrument(info_span!("sync_client_handshake", %actor_id, %addr))
}))
.collect::<Vec<(ActorId, SocketAddr, Result<_, SyncError>)>>()
.await;

debug!("collected member needs and such!");
debug!("collected member state and such!");

#[allow(clippy::manual_try_fold)]
let syncers = results
.into_iter()
.fold(Ok(vec![]), |agg, (actor_id, addr, res)| match res {
Ok((needs, tx, read)) => {
Ok((state, tx, read)) => {
let mut v = agg.unwrap_or_default();
v.push((actor_id, addr, needs, tx, read));
v.push((actor_id, addr, state, tx, read));
Ok(v)
}
Err(e) => {
Expand All @@ -1177,12 +1152,26 @@ pub async fn parallel_sync(
})?;

let len = syncers.len();
let actor_state: Vec<_> = syncers.iter().map(|x| (x.0, x.2.clone())).collect();
let actor_needs = distribute_available_needs(our_sync_state, actor_state);

let (readers, mut servers) = {
let mut rng = rand::thread_rng();
syncers.into_iter().fold(
(Vec::with_capacity(len), Vec::with_capacity(len)),
|(mut readers, mut servers), (actor_id, addr, needs, tx, read)| {
|(mut readers, mut servers), (actor_id, addr, state, tx, read)| {
let mut needs = actor_needs.get(&actor_id).cloned().unwrap_or_default();

let cleared_ts = state.last_cleared_ts;
if let Some(ts) = cleared_ts {
if let Some(last_seen) = our_empty_ts.get(&actor_id) {
if last_seen.is_none() || last_seen.unwrap() < ts {
debug!(%actor_id, "got last cleared ts {cleared_ts:?} - out last_seen {last_seen:?}");
needs.push((actor_id, SyncNeedV1::Empty { ts: *last_seen }));
}
}
}

if needs.is_empty() {
trace!(%actor_id, "no needs!");
return (readers, servers);
Expand All @@ -1191,36 +1180,16 @@ pub async fn parallel_sync(

trace!(%actor_id, "needs: {needs:?}");

debug!(%actor_id, %addr, "needs len: {}", needs.values().map(|needs| needs.iter().map(|need| match need {
debug!(%actor_id, %addr, "needs len: {}", needs.iter().map(|(_, need)| match need {
SyncNeedV1::Full {versions} => (versions.end().0 - versions.start().0) as usize + 1,
SyncNeedV1::Partial {..} => 0,
SyncNeedV1::Empty {..} => 0,
}).sum::<usize>()).sum::<usize>());
}).sum::<usize>());

let actor_needs = needs
.into_iter()
.flat_map(|(actor_id, needs)| {
let mut needs: Vec<_> = needs
.into_iter()
.flat_map(|need| match need {
// chunk the versions, sometimes it's 0..=1000000 and that's far too big for a chunk!
SyncNeedV1::Full { versions } => chunk_range(versions, 10)
.map(|versions| SyncNeedV1::Full { versions })
.collect(),

need => vec![need],
})
.collect();
needs.shuffle(&mut rng);

// NOTE: IMPORTANT! shuffle the vec so we don't keep looping over the same later on
needs.shuffle(&mut rng);

needs
.into_iter()
.map(|need| (actor_id, need))
.collect::<Vec<_>>()
})
.collect::<VecDeque<_>>();
let actor_needs = needs.into_iter().collect::<VecDeque<_>>();

servers.push((
actor_id,
Expand Down Expand Up @@ -1466,6 +1435,40 @@ pub async fn parallel_sync(
.sum::<usize>())
}

pub fn distribute_available_needs(
mut our_state: SyncStateV1,
mut states: Vec<(ActorId, SyncStateV1)>,
) -> HashMap<ActorId, Vec<(ActorId, SyncNeedV1)>> {
let mut final_needs: HashMap<ActorId, Vec<(ActorId, SyncNeedV1)>> = HashMap::new();

while !states.is_empty() {
let mut remove_keys = vec![];
for (actor_id, state) in &states {
let actor_needs = our_state.get_n_needs(&state, 10);
// we can get no more needs from this actor
if actor_needs.is_empty() {
remove_keys.push(*actor_id);
} else {
let needs: Vec<_> = actor_needs
.clone()
.into_iter()
.flat_map(|(actor_id, needs)| {
needs.into_iter().map(move |need| (actor_id, need))
})
.collect();
final_needs
.entry(*actor_id)
.or_default()
.extend_from_slice(&needs);
our_state.merge_needs(&actor_needs);
}
}
states.retain(|(actor, _)| !remove_keys.contains(actor));
}

final_needs
}

#[tracing::instrument(skip(agent, bookie, their_actor_id, read, write), fields(actor_id = %their_actor_id), err)]
pub async fn serve_sync(
agent: &Agent,
Expand Down Expand Up @@ -1732,13 +1735,59 @@ mod tests {
use rand::{Rng, RngCore};
use tempfile::TempDir;
use tripwire::Tripwire;
use uuid::Uuid;

use crate::{
agent::{process_multiple_changes, setup},
api::public::api_v1_db_schema,
};

use super::*;
#[test]
fn test_get_needs() -> eyre::Result<()> {
let original_state: SyncStateV1 = SyncStateV1::default();

let actor1 = ActorId(Uuid::new_v4());
let actor2 = ActorId(Uuid::new_v4());
let mut actor1_state = SyncStateV1::default();
actor1_state.heads.insert(actor1, Version(60));
actor1_state.heads.insert(actor2, Version(20));

let mut actor2_state = SyncStateV1::default();
actor2_state.heads.insert(actor1, Version(60));
actor2_state.heads.insert(actor2, Version(20));

let needs_map = distribute_available_needs(
original_state.clone(),
vec![
(actor1, actor1_state.clone()),
(actor2, actor2_state.clone()),
],
);
println!("{:#?}", needs_map);

let actor3 = ActorId(Uuid::new_v4());
let mut actor3_state = SyncStateV1::default();
actor3_state.heads.insert(actor3, Version(40));
// actor 2 has seen only till Version(20)
actor2_state.heads.insert(actor3, Version(10));
// actor 1 has seen up to 40 but has some needs
actor1_state.heads.insert(actor3, Version(40));
actor1_state
.need
.insert(actor3, vec![(Version(3)..=Version(20))]);

let needs_map = distribute_available_needs(
original_state,
vec![
(actor1, actor1_state),
(actor2, actor2_state),
(actor3, actor3_state),
],
);
println!("{:#?}", needs_map);
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_handle_need() -> eyre::Result<()> {
Expand Down
1 change: 0 additions & 1 deletion crates/corro-types/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,6 @@ impl VersionsSnapshot {
conn.prepare_cached("INSERT OR REPLACE INTO __corro_sync_state VALUES (?, ?)")?
.execute((self.actor_id, ts))?;
}

Ok(())
}

Expand Down
18 changes: 14 additions & 4 deletions crates/corro-types/src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{cmp, fmt, io, num::NonZeroU32, ops::{Deref, RangeInclusive}, time::Duration};
use std::{
cmp, fmt, io,
num::NonZeroU32,
ops::{Deref, RangeInclusive},
time::Duration,
};

use bytes::{Bytes, BytesMut};
use corro_api_types::{row_to_change, Change};
Expand Down Expand Up @@ -165,9 +170,14 @@ impl Changeset {
// determine the estimated resource cost of processing a change
pub fn processing_cost(&self) -> usize {
match self {
Changeset::Empty { versions, .. } => cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20),
Changeset::EmptySet { versions, .. } => versions.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::<usize>(),
Changeset::Full { changes, ..} => changes.len(),
Changeset::Empty { versions, .. } => {
cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)
}
Changeset::EmptySet { versions, .. } => versions
.iter()
.map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20))
.sum::<usize>(),
Changeset::Full { changes, .. } => changes.len(),
}
}

Expand Down
Loading

0 comments on commit be67362

Please sign in to comment.