Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate needs requested during sync #254

Closed
wants to merge 11 commits into from
5 changes: 2 additions & 3 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,12 +988,11 @@ pub async fn process_multiple_changes(

if let Some(ts) = last_cleared {
let mut booked_writer = agent
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
booked_writer.update_cleared_ts(ts);
}


for (_, changeset, _, _) in changesets.iter() {
if let Some(ts) = changeset.ts() {
let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration();
Expand Down
233 changes: 77 additions & 156 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cmp;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap};
use std::net::SocketAddr;
use std::ops::RangeInclusive;
use std::sync::Arc;
Expand All @@ -23,7 +23,6 @@ use futures::{Future, Stream, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use metrics::counter;
use quinn::{RecvStream, SendStream};
use rand::seq::SliceRandom;
use rangemap::{RangeInclusiveMap, RangeInclusiveSet};
use rusqlite::{named_params, Connection};
use speedy::Writable;
Expand Down Expand Up @@ -942,16 +941,6 @@ async fn process_sync(
Ok(())
}

fn chunk_range<T: std::iter::Step + std::ops::Add<u64, Output = T> + std::cmp::Ord + Copy>(
range: RangeInclusive<T>,
chunk_size: usize,
) -> impl Iterator<Item = RangeInclusive<T>> {
range.clone().step_by(chunk_size).map(move |block_start| {
let block_end = (block_start + chunk_size as u64).min(*range.end());
block_start..=block_end
})
}

fn encode_sync_msg(
codec: &mut LengthDelimitedCodec,
encode_buf: &mut BytesMut,
Expand Down Expand Up @@ -1038,7 +1027,7 @@ pub async fn parallel_sync(
agent: &Agent,
transport: &Transport,
members: Vec<(ActorId, SocketAddr)>,
our_sync_state: SyncStateV1,
mut our_sync_state: SyncStateV1,
our_empty_ts: HashMap<ActorId, Option<Timestamp>>,
) -> Result<usize, SyncError> {
trace!(
Expand Down Expand Up @@ -1127,38 +1116,23 @@ pub async fn parallel_sync(

counter!("corro.sync.client.member", "id" => actor_id.to_string(), "addr" => addr.to_string()).increment(1);

let mut needs = our_sync_state.compute_available_needs(&their_sync_state);

trace!(%actor_id, self_actor_id = %agent.actor_id(), "computed needs");

let cleared_ts = their_sync_state.last_cleared_ts;

if let Some(ts) = cleared_ts {
if let Some(last_seen) = our_empty_ts.get(&actor_id) {
if last_seen.is_none() || last_seen.unwrap() < ts {
debug!(%actor_id, "got last cleared ts {cleared_ts:?} - out last_seen {last_seen:?}");
needs.entry(actor_id).or_default().push( SyncNeedV1::Empty { ts: *last_seen });
}
}
}

Ok::<_, SyncError>((needs, tx, read))
Ok::<_, SyncError>((their_sync_state, tx, read))
}.await
)
}.instrument(info_span!("sync_client_handshake", %actor_id, %addr))
}))
.collect::<Vec<(ActorId, SocketAddr, Result<_, SyncError>)>>()
.await;

debug!("collected member needs and such!");
debug!("collected member state and such!");

#[allow(clippy::manual_try_fold)]
let syncers = results
.into_iter()
.fold(Ok(vec![]), |agg, (actor_id, addr, res)| match res {
Ok((needs, tx, read)) => {
Ok((state, tx, read)) => {
let mut v = agg.unwrap_or_default();
v.push((actor_id, addr, needs, tx, read));
v.push((actor_id, addr, state, tx, read));
Ok(v)
}
Err(e) => {
Expand All @@ -1179,53 +1153,15 @@ pub async fn parallel_sync(
let len = syncers.len();

let (readers, mut servers) = {
let mut rng = rand::thread_rng();
syncers.into_iter().fold(
(Vec::with_capacity(len), Vec::with_capacity(len)),
|(mut readers, mut servers), (actor_id, addr, needs, tx, read)| {
if needs.is_empty() {
trace!(%actor_id, "no needs!");
return (readers, servers);
}
|(mut readers, mut servers), (actor_id, addr, state, tx, read)| {
readers.push((actor_id, read));

trace!(%actor_id, "needs: {needs:?}");

debug!(%actor_id, %addr, "needs len: {}", needs.values().map(|needs| needs.iter().map(|need| match need {
SyncNeedV1::Full {versions} => (versions.end().0 - versions.start().0) as usize + 1,
SyncNeedV1::Partial {..} => 0,
SyncNeedV1::Empty {..} => 0,
}).sum::<usize>()).sum::<usize>());

let actor_needs = needs
.into_iter()
.flat_map(|(actor_id, needs)| {
let mut needs: Vec<_> = needs
.into_iter()
.flat_map(|need| match need {
// chunk the versions, sometimes it's 0..=1000000 and that's far too big for a chunk!
SyncNeedV1::Full { versions } => chunk_range(versions, 10)
.map(|versions| SyncNeedV1::Full { versions })
.collect(),

need => vec![need],
})
.collect();

// NOTE: IMPORTANT! shuffle the vec so we don't keep looping over the same later on
needs.shuffle(&mut rng);

needs
.into_iter()
.map(|need| (actor_id, need))
.collect::<Vec<_>>()
})
.collect::<VecDeque<_>>();

servers.push((
actor_id,
addr,
actor_needs,
state,
tx,
));

Expand All @@ -1244,103 +1180,34 @@ pub async fn parallel_sync(
let mut send_buf = BytesMut::new();
let mut encode_buf = BytesMut::new();

// already requested full versions
let mut req_full: HashMap<ActorId, RangeInclusiveSet<Version>> = HashMap::new();

// already requested partial version sequences
let mut req_partials: HashMap<(ActorId, Version), RangeInclusiveSet<CrsqlSeq>> = HashMap::new();

let start = Instant::now();

loop {
if servers.is_empty() {
break;
}
let mut next_servers = Vec::with_capacity(servers.len());
'servers: for (server_actor_id, addr, mut needs, mut tx) in servers {
if needs.is_empty() {
continue;
}

let mut drained = 0;

while drained < 10 {
let (actor_id, need) = match needs.pop_front() {
Some(popped) => popped,
None => {
break;
}
};

drained += 1;

let actual_needs = match need {
SyncNeedV1::Full { versions } => {
let range = req_full.entry(actor_id).or_default();

let mut new_versions =
RangeInclusiveSet::from_iter([versions.clone()].into_iter());

// check if we've already requested
for overlap in range.overlapping(&versions) {
new_versions.remove(overlap.clone());
}

if new_versions.is_empty() {
continue;
}

new_versions
.into_iter()
.map(|versions| {
range.remove(versions.clone());
SyncNeedV1::Full { versions }
})
.collect()
}
SyncNeedV1::Partial { version, seqs } => {
let range = req_partials.entry((actor_id, version)).or_default();
let mut new_seqs =
RangeInclusiveSet::from_iter(seqs.clone().into_iter());

for seqs in seqs {
for overlap in range.overlapping(&seqs) {
new_seqs.remove(overlap.clone());
}
}

if new_seqs.is_empty() {
continue;
}

vec![SyncNeedV1::Partial {
version,
seqs: new_seqs
.into_iter()
.map(|seqs| {
range.remove(seqs.clone());
seqs
})
.collect(),
}]
}
need => {vec![need]},
};

if actual_needs.is_empty() {
warn!(%server_actor_id, %actor_id, %addr, "nothing to send!");
continue;
'servers: for (server_actor_id, addr, state, mut tx) in servers {
let mut last_need: HashMap<ActorId, Vec<SyncNeedV1>> = HashMap::new();
for _ in 0..10 {
let needs = our_sync_state.get_n_needs(&state, 10);
last_need = needs.clone();
if needs.is_empty() {
break;
}

let req_len = actual_needs.len();
our_sync_state.merge_needs(&needs);

let needs = Vec::from_iter(needs.into_iter());
let req_len = needs.len();

if let Err(e) = encode_sync_msg(
&mut codec,
&mut encode_buf,
&mut send_buf,
SyncMessage::V1(SyncMessageV1::Request(vec![(actor_id, actual_needs)])),
SyncMessage::V1(SyncMessageV1::Request(needs)),
) {
error!(%server_actor_id, %actor_id, %addr, "could not encode sync request: {e} (elapsed: {:?})", start.elapsed());
error!(%server_actor_id, %server_actor_id, %addr, "could not encode sync request: {e} (elapsed: {:?})", start.elapsed());
continue 'servers;
}

Expand All @@ -1357,15 +1224,34 @@ pub async fn parallel_sync(
tokio::task::yield_now().await;
}

if needs.is_empty() {
if last_need.is_empty() {
if let Some(ts) = state.last_cleared_ts {
if let Some(last_seen) = our_empty_ts.get(&server_actor_id) {
if last_seen.is_none() || last_seen.unwrap() < ts {
if let Err(e) = encode_sync_msg(
&mut codec,
&mut encode_buf,
&mut send_buf,
SyncMessage::V1(SyncMessageV1::Request(vec![(server_actor_id, vec![SyncNeedV1::Empty { ts: *last_seen }])])),
) {
error!(%server_actor_id, %server_actor_id, %addr, "could not encode sync request: {e} (elapsed: {:?})", start.elapsed());
} else {
if let Err(e) = write_buf(&mut send_buf, &mut tx).await {
error!(%server_actor_id, %addr, "could not write sync requests: {e} (elapsed: {:?})", start.elapsed());
}
}
}
}
}

if let Err(e) = tx.finish().instrument(info_span!("quic_finish")).await {
warn!("could not finish stream while sending sync requests: {e}");
}
debug!(%server_actor_id, %addr, "done trying to sync w/ actor after {:?}", start.elapsed());
continue;
}

next_servers.push((server_actor_id, addr, needs, tx));
next_servers.push((server_actor_id, addr, state, tx));
}
servers = next_servers;
}
Expand Down Expand Up @@ -1466,6 +1352,40 @@ pub async fn parallel_sync(
.sum::<usize>())
}

pub fn distribute_available_needs(
mut our_state: SyncStateV1,
mut states: Vec<(ActorId, SyncStateV1)>,
) -> HashMap<ActorId, Vec<(ActorId, SyncNeedV1)>> {
let mut final_needs: HashMap<ActorId, Vec<(ActorId, SyncNeedV1)>> = HashMap::new();

while !states.is_empty() {
let mut remove_keys = vec![];
for (actor_id, state) in &states {
let actor_needs = our_state.get_n_needs(&state, 10);
// we can get no more needs from this actor
if actor_needs.is_empty() {
remove_keys.push(*actor_id);
} else {
let needs: Vec<_> = actor_needs
.clone()
.into_iter()
.flat_map(|(actor_id, needs)| {
needs.into_iter().map(move |need| (actor_id, need))
})
.collect();
final_needs
.entry(*actor_id)
.or_default()
.extend_from_slice(&needs);
our_state.merge_needs(&actor_needs);
}
}
states.retain(|(actor, _)| !remove_keys.contains(actor));
}

final_needs
}

#[tracing::instrument(skip(agent, bookie, their_actor_id, read, write), fields(actor_id = %their_actor_id), err)]
pub async fn serve_sync(
agent: &Agent,
Expand Down Expand Up @@ -1740,6 +1660,7 @@ mod tests {

use super::*;


#[tokio::test(flavor = "multi_thread")]
async fn test_handle_need() -> eyre::Result<()> {
_ = tracing_subscriber::fmt::try_init();
Expand Down
1 change: 0 additions & 1 deletion crates/corro-types/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,6 @@ impl VersionsSnapshot {
conn.prepare_cached("INSERT OR REPLACE INTO __corro_sync_state VALUES (?, ?)")?
.execute((self.actor_id, ts))?;
}

Ok(())
}

Expand Down
18 changes: 14 additions & 4 deletions crates/corro-types/src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{cmp, fmt, io, num::NonZeroU32, ops::{Deref, RangeInclusive}, time::Duration};
use std::{
cmp, fmt, io,
num::NonZeroU32,
ops::{Deref, RangeInclusive},
time::Duration,
};

use bytes::{Bytes, BytesMut};
use corro_api_types::{row_to_change, Change};
Expand Down Expand Up @@ -165,9 +170,14 @@ impl Changeset {
// determine the estimated resource cost of processing a change
pub fn processing_cost(&self) -> usize {
match self {
Changeset::Empty { versions, .. } => cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20),
Changeset::EmptySet { versions, .. } => versions.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::<usize>(),
Changeset::Full { changes, ..} => changes.len(),
Changeset::Empty { versions, .. } => {
cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)
}
Changeset::EmptySet { versions, .. } => versions
.iter()
.map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20))
.sum::<usize>(),
Changeset::Full { changes, .. } => changes.len(),
}
}

Expand Down
Loading
Loading