Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate needs requested during sync #254

Closed
wants to merge 11 commits into from
5 changes: 2 additions & 3 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,12 +988,11 @@ pub async fn process_multiple_changes(

if let Some(ts) = last_cleared {
let mut booked_writer = agent
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
booked_writer.update_cleared_ts(ts);
}


for (_, changeset, _, _) in changesets.iter() {
if let Some(ts) = changeset.ts() {
let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration();
Expand Down
157 changes: 103 additions & 54 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,16 +942,6 @@ async fn process_sync(
Ok(())
}

fn chunk_range<T: std::iter::Step + std::ops::Add<u64, Output = T> + std::cmp::Ord + Copy>(
range: RangeInclusive<T>,
chunk_size: usize,
) -> impl Iterator<Item = RangeInclusive<T>> {
range.clone().step_by(chunk_size).map(move |block_start| {
let block_end = (block_start + chunk_size as u64).min(*range.end());
block_start..=block_end
})
}

fn encode_sync_msg(
codec: &mut LengthDelimitedCodec,
encode_buf: &mut BytesMut,
Expand Down Expand Up @@ -1127,38 +1117,23 @@ pub async fn parallel_sync(

counter!("corro.sync.client.member", "id" => actor_id.to_string(), "addr" => addr.to_string()).increment(1);

let mut needs = our_sync_state.compute_available_needs(&their_sync_state);

trace!(%actor_id, self_actor_id = %agent.actor_id(), "computed needs");

let cleared_ts = their_sync_state.last_cleared_ts;

if let Some(ts) = cleared_ts {
if let Some(last_seen) = our_empty_ts.get(&actor_id) {
if last_seen.is_none() || last_seen.unwrap() < ts {
debug!(%actor_id, "got last cleared ts {cleared_ts:?} - out last_seen {last_seen:?}");
needs.entry(actor_id).or_default().push( SyncNeedV1::Empty { ts: *last_seen });
}
}
}

Ok::<_, SyncError>((needs, tx, read))
Ok::<_, SyncError>((their_sync_state, tx, read))
}.await
)
}.instrument(info_span!("sync_client_handshake", %actor_id, %addr))
}))
.collect::<Vec<(ActorId, SocketAddr, Result<_, SyncError>)>>()
.await;

debug!("collected member needs and such!");
debug!("collected member state and such!");

#[allow(clippy::manual_try_fold)]
let syncers = results
.into_iter()
.fold(Ok(vec![]), |agg, (actor_id, addr, res)| match res {
Ok((needs, tx, read)) => {
Ok((state, tx, read)) => {
let mut v = agg.unwrap_or_default();
v.push((actor_id, addr, needs, tx, read));
v.push((actor_id, addr, state, tx, read));
Ok(v)
}
Err(e) => {
Expand All @@ -1177,12 +1152,26 @@ pub async fn parallel_sync(
})?;

let len = syncers.len();
let actor_state: Vec<_> = syncers.iter().map(|x| (x.0, x.2.clone())).collect();
let actor_needs = distribute_available_needs(our_sync_state, actor_state);

let (readers, mut servers) = {
let mut rng = rand::thread_rng();
syncers.into_iter().fold(
(Vec::with_capacity(len), Vec::with_capacity(len)),
|(mut readers, mut servers), (actor_id, addr, needs, tx, read)| {
|(mut readers, mut servers), (actor_id, addr, state, tx, read)| {
let mut needs = actor_needs.get(&actor_id).cloned().unwrap_or_default();

let cleared_ts = state.last_cleared_ts;
if let Some(ts) = cleared_ts {
if let Some(last_seen) = our_empty_ts.get(&actor_id) {
if last_seen.is_none() || last_seen.unwrap() < ts {
debug!(%actor_id, "got last cleared ts {cleared_ts:?} - out last_seen {last_seen:?}");
needs.push((actor_id, SyncNeedV1::Empty { ts: *last_seen }));
}
}
}

if needs.is_empty() {
trace!(%actor_id, "no needs!");
return (readers, servers);
Expand All @@ -1191,36 +1180,16 @@ pub async fn parallel_sync(

trace!(%actor_id, "needs: {needs:?}");

debug!(%actor_id, %addr, "needs len: {}", needs.values().map(|needs| needs.iter().map(|need| match need {
debug!(%actor_id, %addr, "needs len: {}", needs.iter().map(|(_, need)| match need {
SyncNeedV1::Full {versions} => (versions.end().0 - versions.start().0) as usize + 1,
SyncNeedV1::Partial {..} => 0,
SyncNeedV1::Empty {..} => 0,
}).sum::<usize>()).sum::<usize>());
}).sum::<usize>());

let actor_needs = needs
.into_iter()
.flat_map(|(actor_id, needs)| {
let mut needs: Vec<_> = needs
.into_iter()
.flat_map(|need| match need {
// chunk the versions, sometimes it's 0..=1000000 and that's far too big for a chunk!
SyncNeedV1::Full { versions } => chunk_range(versions, 10)
.map(|versions| SyncNeedV1::Full { versions })
.collect(),

need => vec![need],
})
.collect();
needs.shuffle(&mut rng);

// NOTE: IMPORTANT! shuffle the vec so we don't keep looping over the same later on
needs.shuffle(&mut rng);

needs
.into_iter()
.map(|need| (actor_id, need))
.collect::<Vec<_>>()
})
.collect::<VecDeque<_>>();
let actor_needs = needs.into_iter().collect::<VecDeque<_>>();

servers.push((
actor_id,
Expand Down Expand Up @@ -1466,6 +1435,40 @@ pub async fn parallel_sync(
.sum::<usize>())
}

pub fn distribute_available_needs(
mut our_state: SyncStateV1,
mut states: Vec<(ActorId, SyncStateV1)>,
) -> HashMap<ActorId, Vec<(ActorId, SyncNeedV1)>> {
let mut final_needs: HashMap<ActorId, Vec<(ActorId, SyncNeedV1)>> = HashMap::new();

while !states.is_empty() {
let mut remove_keys = vec![];
for (actor_id, state) in &states {
let actor_needs = our_state.get_n_needs(&state, 10);
// we can get no more needs from this actor
if actor_needs.is_empty() {
remove_keys.push(*actor_id);
} else {
let needs: Vec<_> = actor_needs
.clone()
.into_iter()
.flat_map(|(actor_id, needs)| {
needs.into_iter().map(move |need| (actor_id, need))
})
.collect();
final_needs
.entry(*actor_id)
.or_default()
.extend_from_slice(&needs);
our_state.merge_needs(&actor_needs);
}
}
states.retain(|(actor, _)| !remove_keys.contains(actor));
}

final_needs
}

#[tracing::instrument(skip(agent, bookie, their_actor_id, read, write), fields(actor_id = %their_actor_id), err)]
pub async fn serve_sync(
agent: &Agent,
Expand Down Expand Up @@ -1732,13 +1735,59 @@ mod tests {
use rand::{Rng, RngCore};
use tempfile::TempDir;
use tripwire::Tripwire;
use uuid::Uuid;

use crate::{
agent::{process_multiple_changes, setup},
api::public::api_v1_db_schema,
};

use super::*;
#[test]
fn test_get_needs() -> eyre::Result<()> {
let original_state: SyncStateV1 = SyncStateV1::default();

let actor1 = ActorId(Uuid::new_v4());
let actor2 = ActorId(Uuid::new_v4());
let mut actor1_state = SyncStateV1::default();
actor1_state.heads.insert(actor1, Version(60));
actor1_state.heads.insert(actor2, Version(20));

let mut actor2_state = SyncStateV1::default();
actor2_state.heads.insert(actor1, Version(60));
actor2_state.heads.insert(actor2, Version(20));

let needs_map = distribute_available_needs(
original_state.clone(),
vec![
(actor1, actor1_state.clone()),
(actor2, actor2_state.clone()),
],
);
println!("{:#?}", needs_map);

let actor3 = ActorId(Uuid::new_v4());
let mut actor3_state = SyncStateV1::default();
actor3_state.heads.insert(actor3, Version(40));
// actor 2 has seen only till Version(20)
actor2_state.heads.insert(actor3, Version(10));
// actor 1 has seen up to 40 but has some needs
actor1_state.heads.insert(actor3, Version(40));
actor1_state
.need
.insert(actor3, vec![(Version(3)..=Version(20))]);

let needs_map = distribute_available_needs(
original_state,
vec![
(actor1, actor1_state),
(actor2, actor2_state),
(actor3, actor3_state),
],
);
println!("{:#?}", needs_map);
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_handle_need() -> eyre::Result<()> {
Expand Down
1 change: 0 additions & 1 deletion crates/corro-types/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,6 @@ impl VersionsSnapshot {
conn.prepare_cached("INSERT OR REPLACE INTO __corro_sync_state VALUES (?, ?)")?
.execute((self.actor_id, ts))?;
}

Ok(())
}

Expand Down
18 changes: 14 additions & 4 deletions crates/corro-types/src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{cmp, fmt, io, num::NonZeroU32, ops::{Deref, RangeInclusive}, time::Duration};
use std::{
cmp, fmt, io,
num::NonZeroU32,
ops::{Deref, RangeInclusive},
time::Duration,
};

use bytes::{Bytes, BytesMut};
use corro_api_types::{row_to_change, Change};
Expand Down Expand Up @@ -165,9 +170,14 @@ impl Changeset {
// determine the estimated resource cost of processing a change
pub fn processing_cost(&self) -> usize {
match self {
Changeset::Empty { versions, .. } => cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20),
Changeset::EmptySet { versions, .. } => versions.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::<usize>(),
Changeset::Full { changes, ..} => changes.len(),
Changeset::Empty { versions, .. } => {
cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)
}
Changeset::EmptySet { versions, .. } => versions
.iter()
.map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20))
.sum::<usize>(),
Changeset::Full { changes, .. } => changes.len(),
}
}

Expand Down
Loading
Loading