From f5ae34eb442fed1db5be246589ba0d8f115ef517 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Wed, 20 Sep 2023 11:21:55 -0400 Subject: [PATCH] Adaptive chunking (#58) * try holding a write lock for less time while still keeping it safe * maybe this'll work * a bit more logging * attempt to handle multiple changes at once * try to fix spammy rtt warning + try to dedup changes * bomb when errors are encountered inserting multiple changes * seqs start at 0 * disabling some optimizations * this is weird, but testing if it's the issue * why this would make things unreliable is beyond me * one transaction = one db version, forgot about that... * attempt to get rid of unique constraint errors * better query for counting buffered changes, if count drops to zero then we should update the gauge * more logs * attempt to clean up mess * continue loop afterwards * clear buffered meta when applying complete change * shorter sync backoff --- crates/corro-agent/src/agent.rs | 666 +++++++++++++---------- crates/corro-agent/src/api/peer.rs | 436 +++++++++------ crates/corro-agent/src/api/public/mod.rs | 16 +- crates/corro-agent/src/transport.rs | 4 +- crates/corro-types/src/agent.rs | 14 +- crates/corro-types/src/broadcast.rs | 12 + 6 files changed, 684 insertions(+), 464 deletions(-) diff --git a/crates/corro-agent/src/agent.rs b/crates/corro-agent/src/agent.rs index 0e75b9e8..76b3a880 100644 --- a/crates/corro-agent/src/agent.rs +++ b/crates/corro-agent/src/agent.rs @@ -49,10 +49,11 @@ use bytes::{Bytes, BytesMut}; use foca::{Member, Notification}; use futures::{FutureExt, TryFutureExt}; use hyper::{server::conn::AddrIncoming, StatusCode}; +use itertools::Itertools; use metrics::{counter, gauge, histogram, increment_counter}; use parking_lot::RwLock; use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng}; -use rangemap::RangeInclusiveSet; +use rangemap::{RangeInclusiveMap, RangeInclusiveSet}; use rusqlite::{named_params, params, Connection, OptionalExtension, Transaction}; use spawn::spawn_counted; use speedy::Readable; @@ -74,7 +75,7 @@ use trust_dns_resolver::{ proto::rr::{RData, RecordType}, }; -const MAX_SYNC_BACKOFF: Duration = Duration::from_secs(60); // 1 minute oughta be enough, we're constantly getting broadcasts randomly + targetted +const MAX_SYNC_BACKOFF: Duration = Duration::from_secs(15); // 1 minute oughta be enough, we're constantly getting broadcasts randomly + targetted const RANDOM_NODES_CHOICES: usize = 10; const COMPACT_BOOKED_INTERVAL: Duration = Duration::from_secs(300); const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300); @@ -187,6 +188,17 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age for ((actor_id, version), (seqs, last_seq, ts)) in partials { debug!(%actor_id, version, "looking at partials (seq: {seqs:?}, last_seq: {last_seq})"); let ranges = bk.entry(actor_id).or_default(); + + if let Some(known) = ranges.get(&version) { + warn!(%actor_id, version, "found partial data that has been applied, clearing buffered meta, known: {known:?}"); + + let mut conn = pool.write_priority().await?; + let tx = conn.transaction()?; + clear_buffered_meta(&tx, actor_id, version)?; + tx.commit()?; + continue; + } + let gaps_count = seqs.gaps(&(0..=last_seq)).count(); ranges.insert( version..=version, @@ -304,7 +316,8 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { let agent = agent.clone(); async move { let stream = ReceiverStream::new(rtt_rx); - let chunker = stream.chunks_timeout(20, Duration::from_secs(1)); + // we can handle a lot of them I think... + let chunker = stream.chunks_timeout(1024, Duration::from_secs(1)); tokio::pin!(chunker); while let Some(chunks) = chunker.next().await { let mut members = agent.members().write(); @@ -1031,7 +1044,7 @@ fn collect_metrics(agent: &Agent) { } match conn - .prepare_cached("SELECT actor_id, count(site_id) FROM __corro_members LEFT JOIN __corro_buffered_changes ON site_id = actor_id GROUP BY actor_id") + .prepare_cached("SELECT actor_id, (select count(site_id) FROM __corro_buffered_changes WHERE site_id = actor_id) FROM __corro_members") .and_then(|mut prepped| { prepped .query_map((), |row| { @@ -1177,13 +1190,7 @@ async fn handle_gossip( let priority_label = if high_priority { "high" } else { "normal" }; counter!("corro.broadcast.recv.count", messages.len() as u64, "priority" => priority_label); - let mut rebroadcast = vec![]; - - for msg in messages { - if let Some(msg) = process_msg(&agent, msg).await? { - rebroadcast.push(msg); - } - } + let rebroadcast = process_messages(&agent, messages).await?; for msg in rebroadcast { if let Err(e) = agent @@ -1434,7 +1441,7 @@ async fn resolve_bootstrap( } fn store_empty_changeset( - tx: Transaction, + tx: &Transaction, actor_id: ActorId, versions: RangeInclusive, ) -> Result<(), rusqlite::Error> { @@ -1455,16 +1462,26 @@ fn store_empty_changeset( ])?; for version in versions { - tx.prepare_cached("DELETE FROM __corro_seq_bookkeeping WHERE site_id = ? AND version = ?")? - .execute(params![actor_id, version,])?; - - tx.prepare_cached( - "DELETE FROM __corro_buffered_changes WHERE site_id = ? AND version = ?", - )? - .execute(params![actor_id, version,])?; + clear_buffered_meta(tx, actor_id, version)?; } - tx.commit() + Ok(()) +} + +fn clear_buffered_meta(tx: &Transaction, actor_id: ActorId, version: i64) -> rusqlite::Result<()> { + // remove all buffered changes for cleanup purposes + let count = tx + .prepare_cached("DELETE FROM __corro_buffered_changes WHERE site_id = ? AND version = ?")? + .execute(params![actor_id, version])?; + debug!(%actor_id, version, "deleted {count} buffered changes"); + + // delete all bookkept sequences for this version + let count = tx + .prepare_cached("DELETE FROM __corro_seq_bookkeeping WHERE site_id = ? AND version = ?")? + .execute(params![actor_id, version])?; + debug!(%actor_id, version, "deleted {count} sequences in bookkeeping"); + + Ok(()) } async fn process_fully_buffered_changes( @@ -1553,21 +1570,7 @@ async fn process_fully_buffered_changes( info!(%actor_id, version, "no buffered rows, skipped insertion into crsql_changes"); } - // remove all buffered changes for cleanup purposes - let count = tx - .prepare_cached( - "DELETE FROM __corro_buffered_changes WHERE site_id = ? AND version = ?", - )? - .execute(params![actor_id.as_bytes(), version])?; - debug!(%actor_id, version, "deleted {count} buffered changes"); - - // delete all bookkept sequences for this version - let count = tx - .prepare_cached( - "DELETE FROM __corro_seq_bookkeeping WHERE site_id = ? AND version = ?", - )? - .execute(params![actor_id, version])?; - debug!(%actor_id, version, "deleted {count} sequences in bookkeeping"); + clear_buffered_meta(&tx, actor_id, version)?; let rows_impacted: i64 = tx .prepare_cached("SELECT crsql_rows_impacted()")? @@ -1582,13 +1585,25 @@ async fn process_fully_buffered_changes( tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES (?, ?, ?, ?, ?);")?.execute(params![actor_id, version, db_version, last_seq, ts])?; + info!(%actor_id, version, "inserted bookkeeping row after buffered insert"); + Some(KnownDbVersion::Current { db_version, last_seq, ts, }) } else { - tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, last_seq, ts) VALUES (?, ?, ?, ?);")?.execute(params![actor_id, version, last_seq, ts])?; + let _inserted = tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, last_seq, ts) VALUES (?, ?, ?, ?);")?.execute(params![actor_id, version, last_seq, ts])?; + + // if inserted > 0 { + // info!(%actor_id, version, "inserted CLEARED bookkeeping row after buffered insert"); + // Some(KnownDbVersion::Cleared) + // } else { + // warn!(%actor_id, version, "bookkeeping row already existed, it shouldn't matter but it would be nice to fix this issue"); + // None + // } + + info!(%actor_id, version, "inserted CLEARED bookkeeping row after buffered insert"); Some(KnownDbVersion::Cleared) }; @@ -1607,309 +1622,380 @@ async fn process_fully_buffered_changes( Ok(inserted) } -pub async fn process_single_change( +pub async fn process_multiple_changes( agent: &Agent, - change: ChangeV1, -) -> Result, ChangeError> { + changes: Vec, +) -> Result, ChangeError> { let bookie = agent.bookie(); - let is_complete = change.is_complete(); + let mut seen = HashSet::new(); + let mut unknown_changes = Vec::with_capacity(changes.len()); + for change in changes { + let versions = change.versions(); + let seqs = change.seqs(); + if !seen.insert((change.actor_id, versions, seqs.cloned())) { + continue; + } + if bookie + .contains(&change.actor_id, change.versions(), change.seqs()) + .await + { + continue; + } - let ChangeV1 { - actor_id, - changeset, - } = change; + unknown_changes.push(change); + } - if bookie - .contains(&actor_id, changeset.versions(), changeset.seqs()) - .await + // NOTE: should we use `Vec::with_capacity(unknown_changes.len())?` + let mut res = vec![]; + + unknown_changes.sort_by_key(|change| change.actor_id); + + let mut conn = agent.pool().write_normal().await?; + + for (actor_id, changes) in unknown_changes + .into_iter() + .group_by(|change| change.actor_id) + .into_iter() { - trace!( - "already seen these versions from: {actor_id}, version: {:?}", - changeset.versions() - ); - return Ok(None); - } + block_in_place(|| { + let mut knowns = vec![]; + let mut changesets = vec![]; - debug!( - "received {} changes to process from: {actor_id}, versions: {:?}, seqs: {:?} (last_seq: {:?})", - changeset.len(), - changeset.versions(), - changeset.seqs(), - changeset.last_seq() - ); + { + let booked = bookie.for_actor_blocking(actor_id); + let mut booked_write = booked.blocking_write(); - let booked = bookie.for_actor(actor_id).await; - let (db_version, changeset) = { - let mut conn = agent.pool().write_normal().await?; + let mut seen = RangeInclusiveMap::new(); - let mut booked_write = booked.write().await; - // check again, might've changed since we acquired the lock - if booked_write.contains_all(changeset.versions(), changeset.seqs()) { - trace!("previously unknown versions are now deemed known, aborting inserts"); - return Ok(None); - } + for change in changes { + let seqs = change.seqs(); + if booked_write.contains_all(change.versions(), change.seqs()) { + trace!( + "previously unknown versions are now deemed known, aborting inserts" + ); + continue; + } - let (changeset, db_version) = block_in_place(move || { - let tx = conn.transaction()?; + let versions = change.versions(); - let versions = changeset.versions(); - let ChangesetParts { - version, - changes, - seqs, - last_seq, - ts, - } = match changeset.into_parts() { - None => { - store_empty_changeset(tx, actor_id, versions.clone())?; - booked_write.insert_many(versions.clone(), KnownDbVersion::Cleared); - return Ok((Changeset::Empty { versions }, None)); + // check if we've seen this version here... + if versions.clone().all(|version| match seqs { + Some(check_seqs) => match seen.get(&version) { + Some(known) => match known { + KnownDbVersion::Partial { seqs, .. } => { + check_seqs.clone().all(|seq| seqs.contains(&seq)) + } + KnownDbVersion::Current { .. } | KnownDbVersion::Cleared => true, + }, + None => false, + }, + None => seen.contains_key(&version), + }) { + continue; + } + + let tx = conn.transaction()?; + + let (known, changeset) = match process_single_version(&tx, change) { + Ok(res) => res, + Err(e) => { + error!(%actor_id, ?versions, "could not process single change: {e}"); + continue; + } + }; + + tx.commit()?; + + seen.insert(versions.clone(), known.clone()); + + changesets.push((actor_id, changeset)); + knowns.push((versions, known)); } - Some(parts) => parts, - }; - // if not a full range! - if !is_complete { - debug!(%actor_id, version, "incomplete change, seqs: {seqs:?}, last_seq: {last_seq:?}, len: {}", changes.len()); - let mut inserted = 0; - for change in changes.iter() { - trace!("buffering change! {change:?}"); - - // insert change, do nothing on conflict - inserted += tx.prepare_cached( - r#" - INSERT INTO __corro_buffered_changes - ("table", pk, cid, val, col_version, db_version, site_id, cl, seq, version) - VALUES - (:table, :pk, :cid, :val, :col_version, :db_version, :site_id, :cl, :seq, :version) - ON CONFLICT (site_id, db_version, version, seq) - DO NOTHING - "#, - )? - .execute(named_params!{ - ":table": change.table.as_str(), - ":pk": change.pk, - ":cid": change.cid.as_str(), - ":val": &change.val, - ":col_version": change.col_version, - ":db_version": change.db_version, - ":site_id": &change.site_id, - ":cl": change.cl, - ":seq": change.seq, - ":version": version, - })?; + for (versions, known) in knowns { + if let KnownDbVersion::Partial { seqs, last_seq, .. } = &known { + let full_seqs_range = 0..=*last_seq; + let gaps_count = seqs.gaps(&full_seqs_range).count(); + let version = *versions.start(); + if gaps_count == 0 { + // if we have no gaps, then we can schedule applying all these changes. + info!(%actor_id, version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}"); + let tx_apply = agent.tx_apply().clone(); + tokio::spawn(async move { + if let Err(e) = tx_apply.send((actor_id, version)).await { + error!("could not send trigger for applying fully buffered changes later: {e}"); + } + }); + } else { + debug!(%actor_id, version, "still have {gaps_count} gaps in partially buffered seqs"); + } + } + booked_write.insert_many(versions, known); } + } - debug!(%actor_id, version, "buffered {inserted} changes"); + for (actor_id, changeset) in changesets { + process_subs(agent, changeset.changes()); + res.push((actor_id, changeset)); + } - // calculate all known sequences for the actor + version combo - let mut seqs_in_bookkeeping: RangeInclusiveSet = tx - .prepare_cached( - " + Ok::<_, ChangeError>(()) + })?; + } + + Ok(res) +} + +fn process_incomplete_version( + tx: &Transaction, + actor_id: ActorId, + parts: &ChangesetParts, +) -> rusqlite::Result { + let ChangesetParts { + version, + changes, + seqs, + last_seq, + ts, + } = parts; + + debug!(%actor_id, version, "incomplete change, seqs: {seqs:?}, last_seq: {last_seq:?}, len: {}", changes.len()); + let mut inserted = 0; + for change in changes.iter() { + trace!("buffering change! {change:?}"); + + // insert change, do nothing on conflict + inserted += tx.prepare_cached( + r#" + INSERT INTO __corro_buffered_changes + ("table", pk, cid, val, col_version, db_version, site_id, cl, seq, version) + VALUES + (:table, :pk, :cid, :val, :col_version, :db_version, :site_id, :cl, :seq, :version) + ON CONFLICT (site_id, db_version, version, seq) + DO NOTHING + "#, + )? + .execute(named_params!{ + ":table": change.table.as_str(), + ":pk": change.pk, + ":cid": change.cid.as_str(), + ":val": &change.val, + ":col_version": change.col_version, + ":db_version": change.db_version, + ":site_id": &change.site_id, + ":cl": change.cl, + ":seq": change.seq, + ":version": version, + })?; + } + + debug!(%actor_id, version, "buffered {inserted} changes"); + + // calculate all known sequences for the actor + version combo + let mut seqs_in_bookkeeping: RangeInclusiveSet = tx + .prepare_cached( + " SELECT start_seq, end_seq FROM __corro_seq_bookkeeping WHERE site_id = ? AND version = ? ", - )? - .query_map(params![actor_id, version], |row| { - Ok(row.get(0)?..=row.get(1)?) - })? - .collect::>()?; + )? + .query_map(params![actor_id, version], |row| { + Ok(row.get(0)?..=row.get(1)?) + })? + .collect::>()?; - let orig_seqs_in_bookkeeping = seqs_in_bookkeeping.clone(); + // immediately add the new range to the recorded seqs ranges + seqs_in_bookkeeping.insert(seqs.clone()); - // immediately add the new range to the recorded seqs ranges - seqs_in_bookkeeping.insert(seqs.clone()); + tx.prepare_cached("DELETE FROM __corro_seq_bookkeeping WHERE site_id = ? AND version = ?")? + .execute(params![actor_id, version])?; - // all seq for this version (from 0 to the last seq, inclusively) - let full_seqs_range = 0..=last_seq; + for range in seqs_in_bookkeeping.iter() { + tx.prepare_cached( + " + INSERT INTO __corro_seq_bookkeeping (site_id, version, start_seq, end_seq, last_seq, ts) + VALUES (?, ?, ?, ?, ?, ?) + ", + )? + .execute(params![ + actor_id, + version, + range.start(), + range.end(), + last_seq, + ts + ])?; + } - // figure out how many seq gaps we have between 0 and the last seq for this version - let gaps_count = seqs_in_bookkeeping.gaps(&full_seqs_range).count(); + Ok(KnownDbVersion::Partial { + seqs: seqs_in_bookkeeping.clone(), + last_seq: *last_seq, + ts: *ts, + }) +} - tx.prepare_cached( - "DELETE FROM __corro_seq_bookkeeping WHERE site_id = ? AND version = ?", - )? - .execute(params![actor_id, version])?; +fn process_complete_version( + tx: &Transaction, + actor_id: ActorId, + versions: RangeInclusive, + parts: Option, +) -> rusqlite::Result<(KnownDbVersion, Changeset)> { + let ChangesetParts { + version, + changes, + seqs, + last_seq, + ts, + } = match parts { + None => { + store_empty_changeset(tx, actor_id, versions.clone())?; + info!(%actor_id, ?versions, "cleared empty versions range"); + // booked_write.insert_many(versions.clone(), KnownDbVersion::Cleared); + return Ok((KnownDbVersion::Cleared, Changeset::Empty { versions })); + } + Some(parts) => parts, + }; - for range in seqs_in_bookkeeping.iter() { - tx.prepare_cached("INSERT INTO __corro_seq_bookkeeping (site_id, version, start_seq, end_seq, last_seq, ts) - VALUES (?, ?, ?, ?, ?, ?) - ", - )? - .execute(params![ - actor_id, - version, - range.start(), - range.end(), - last_seq, - ts - ])?; - } + info!(%actor_id, version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}"); - tx.commit()?; + let mut impactful_changeset = vec![]; - let changeset = Changeset::Full { - version, - changes, - seqs: seqs.clone(), - last_seq, - ts, - }; + let mut last_rows_impacted = 0; - booked_write.insert_many( - changeset.versions(), - KnownDbVersion::Partial { - seqs: seqs_in_bookkeeping.clone(), - last_seq, - ts, - }, - ); + let changes_len = changes.len(); - // if we have no gaps, then we can schedule applying all these changes. - if gaps_count == 0 { - // no gaps - info!(actor_id = %actor_id, version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}, computed seqs: {seqs_in_bookkeeping:?} (original: {orig_seqs_in_bookkeeping:?})"); - let tx_apply = agent.tx_apply().clone(); - tokio::spawn(async move { - if let Err(e) = tx_apply.send((actor_id, version)).await { - error!("could not send trigger for applying fully buffered changes later: {e}"); - } - }); - } else { - debug!(actor = %agent.actor_id(), "still have {gaps_count} gaps in partially buffered seqs"); - } + let mut changes_per_table = BTreeMap::new(); - return Ok((changeset, None)); - } + for change in changes { + trace!("inserting change! {change:?}"); + tx.prepare_cached( + r#" + INSERT INTO crsql_changes + ("table", pk, cid, val, col_version, db_version, site_id, cl, seq) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + )? + .execute(params![ + change.table.as_str(), + change.pk, + change.cid.as_str(), + &change.val, + change.col_version, + change.db_version, + &change.site_id, + change.cl, + change.seq, + ])?; + let rows_impacted: i64 = tx + .prepare_cached("SELECT crsql_rows_impacted()")? + .query_row((), |row| row.get(0))?; - debug!(%actor_id, version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}"); - - let mut impactful_changeset = vec![]; - - let mut last_rows_impacted = 0; - - let changes_len = changes.len(); - - let mut changes_per_table = BTreeMap::new(); - - for change in changes { - trace!("inserting change! {change:?}"); - tx.prepare_cached( - r#" - INSERT INTO crsql_changes - ("table", pk, cid, val, col_version, db_version, site_id, cl, seq) - VALUES - (?, ?, ?, ?, ?, ?, ?, ?, ?) - "#, - )? - .execute(params![ - change.table.as_str(), - change.pk, - change.cid.as_str(), - &change.val, - change.col_version, - change.db_version, - &change.site_id, - change.cl, - change.seq, - ])?; - let rows_impacted: i64 = tx - .prepare_cached("SELECT crsql_rows_impacted()")? - .query_row((), |row| row.get(0))?; - - if rows_impacted > last_rows_impacted { - debug!(actor = %agent.actor_id(), "inserted a the change into crsql_changes"); - impactful_changeset.push(change); - if let Some(c) = impactful_changeset.last() { - if let Some(counter) = changes_per_table.get_mut(&c.table) { - *counter += 1; - } else { - changes_per_table.insert(c.table.clone(), 1); - } - } + if rows_impacted > last_rows_impacted { + trace!("inserted the change into crsql_changes"); + impactful_changeset.push(change); + if let Some(c) = impactful_changeset.last() { + if let Some(counter) = changes_per_table.get_mut(&c.table) { + *counter += 1; + } else { + changes_per_table.insert(c.table.clone(), 1); } - last_rows_impacted = rows_impacted; } + } + last_rows_impacted = rows_impacted; + } - let db_version: i64 = tx - .prepare_cached("SELECT crsql_next_db_version()")? - .query_row((), |row| row.get(0))?; + let db_version: i64 = tx + .prepare_cached("SELECT crsql_next_db_version()")? + .query_row((), |row| row.get(0))?; - let (known_version, new_changeset, db_version) = if impactful_changeset.is_empty() { - debug!( - "inserting CLEARED bookkeeping row for actor {actor_id}, version: {version}, db_version: {db_version}, ts: {ts:?} (recv changes: {changes_len}, rows impacted: {last_rows_impacted})", - ); - tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version) VALUES (?, ?, ?);")?.execute(params![actor_id, version, version])?; - (KnownDbVersion::Cleared, Changeset::Empty { versions }, None) - } else { - debug!( - "inserting bookkeeping row for actor {actor_id}, version: {version}, db_version: {db_version}, ts: {ts:?} (recv changes: {changes_len}, rows impacted: {last_rows_impacted})", - ); - tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES (?, ?, ?, ?, ?);")?.execute(params![actor_id, version, db_version, last_seq, ts])?; - ( - KnownDbVersion::Current { - db_version, - last_seq, - ts, - }, - Changeset::Full { - version, - changes: impactful_changeset, - seqs, - last_seq, - ts, - }, - Some(db_version), - ) - }; + let (known_version, new_changeset) = if impactful_changeset.is_empty() { + debug!(%actor_id, version, + "inserting CLEARED bookkeeping row db_version: {db_version}, ts: {ts:?} (recv changes: {changes_len}, rows impacted: {last_rows_impacted})", + ); + tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version) VALUES (?, ?, ?);")? + .execute(params![actor_id, version, version])?; + (KnownDbVersion::Cleared, Changeset::Empty { versions }) + } else { + debug!(%actor_id, version, + "inserting bookkeeping row db_version: {db_version}, ts: {ts:?} (recv changes: {changes_len}, rows impacted: {last_rows_impacted})", + ); + tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES (?, ?, ?, ?, ?);")?.execute(params![actor_id, version, db_version, last_seq, ts])?; + ( + KnownDbVersion::Current { + db_version, + last_seq, + ts, + }, + Changeset::Full { + version, + changes: impactful_changeset, + seqs, + last_seq, + ts, + }, + ) + }; - debug!("inserted bookkeeping row"); + debug!(%actor_id, version, "inserted bookkeeping row"); - tx.commit()?; + // in case we got both buffered data and a complete set of changes + clear_buffered_meta(tx, actor_id, version)?; - for (table_name, count) in changes_per_table { - counter!("corro.changes.committed", count, "table" => table_name.to_string(), "source" => "remote"); - } + for (table_name, count) in changes_per_table { + counter!("corro.changes.committed", count, "table" => table_name.to_string(), "source" => "remote"); + } + + Ok::<_, rusqlite::Error>((known_version, new_changeset)) +} - debug!("committed transaction"); +fn process_single_version( + tx: &Transaction, + change: ChangeV1, +) -> rusqlite::Result<(KnownDbVersion, Changeset)> { + let ChangeV1 { + actor_id, + changeset, + } = change; - booked_write.insert_many(new_changeset.versions(), known_version); - trace!("inserted into in-memory bookkeeping"); + let versions = changeset.versions(); - Ok::<_, rusqlite::Error>((new_changeset, db_version)) - })?; + let (known, changeset) = if changeset.is_complete() { + process_complete_version(tx, actor_id, versions, changeset.into_parts())? + } else { + let parts = changeset.into_parts().unwrap(); + let known = process_incomplete_version(tx, actor_id, &parts)?; - (db_version, changeset) + (known, parts.into()) }; - if db_version.is_some() { - process_subs(agent, changeset.changes()); - } - trace!("processed subscriptions, if any!"); - - Ok(Some(changeset)) + Ok((known, changeset)) } -async fn process_msg( +async fn process_messages( agent: &Agent, - bcast: BroadcastV1, -) -> Result, ChangeError> { - Ok(match bcast { - BroadcastV1::Change(change) => { - let actor_id = change.actor_id; - let changeset = process_single_change(agent, change).await?; + bcast: Vec, +) -> Result, ChangeError> { + let changes = bcast + .into_iter() + .map(|bcast| match bcast { + BroadcastV1::Change(change) => change, + }) + .collect(); - changeset.map(|changeset| { - BroadcastV1::Change(ChangeV1 { - actor_id, - changeset, - }) + Ok(process_multiple_changes(agent, changes) + .await? + .into_iter() + .map(|(actor_id, changeset)| { + BroadcastV1::Change(ChangeV1 { + actor_id, + changeset, }) - } - }) + }) + .collect()) } pub fn process_subs(agent: &Agent, changeset: &[Change]) { @@ -2769,7 +2855,7 @@ pub mod tests { .query_row("SELECT crsql_db_version();", (), |row| row.get(0))?; assert_eq!(db_version, 1); - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(2)).await; let ta2 = launch_test_agent( |conf| { @@ -2796,7 +2882,7 @@ pub mod tests { ) .await?; - sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(5)).await; { let conn = ta2.agent.pool().read().await?; diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index e6ea3df4..71d31d5c 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -3,18 +3,18 @@ use std::collections::{BTreeSet, HashMap}; use std::net::SocketAddr; use std::ops::RangeInclusive; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use bytes::{Buf, BufMut, BytesMut}; use corro_types::agent::{Agent, KnownDbVersion, SplitPool}; -use corro_types::broadcast::{ChangeV1, Changeset}; -use corro_types::change::row_to_change; +use corro_types::broadcast::{ChangeV1, Changeset, Timestamp}; +use corro_types::change::{row_to_change, Change}; use corro_types::config::{GossipConfig, TlsClientConfig}; use corro_types::sync::{SyncMessage, SyncMessageEncodeError, SyncMessageV1, SyncStateV1}; use futures::{Stream, TryFutureExt}; use metrics::counter; use quinn::{RecvStream, SendStream}; -use rusqlite::params; +use rusqlite::{params, Connection}; use speedy::Writable; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{channel, Sender}; @@ -22,9 +22,8 @@ use tokio::task::block_in_place; use tokio_stream::StreamExt; use tokio_util::codec::{Encoder, FramedRead, LengthDelimitedCodec}; use tracing::{debug, error, info, trace, warn}; -use tripwire::{Outcome, TimeoutFutureExt}; -use crate::agent::{process_single_change, SyncRecvError}; +use crate::agent::{process_multiple_changes, SyncRecvError}; use crate::api::public::ChunkedChanges; use corro_types::{ @@ -320,6 +319,9 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification { } const MAX_CHANGES_BYTES_PER_MESSAGE: usize = 64 * 1024; +const MIN_CHANGES_BYTES_PER_MESSAGE: usize = 2 * 1024; + +const ADAPT_CHUNK_SIZE_THRESHOLD: Duration = Duration::from_millis(500); async fn process_range( booked: &Booked, @@ -357,8 +359,7 @@ async fn process_range( } for version in versions { - let bw = booked.write().await; - let known = bw.get(&version); + let known = { booked.read().await.get(&version).cloned() }; if let Some(known_version) = known { process_version( pool, @@ -366,6 +367,7 @@ async fn process_range( is_local, version, known_version, + booked, vec![], sender, ) @@ -380,110 +382,84 @@ async fn process_range( Ok(()) } -async fn process_version( - pool: &SplitPool, +#[allow(clippy::too_many_arguments)] +fn handle_known_version( + conn: &mut Connection, actor_id: ActorId, is_local: bool, version: i64, - known_version: &KnownDbVersion, - mut seqs_needed: Vec>, + init_known: KnownDbVersion, + booked: &Booked, + seqs_needed: Vec>, + last_seq: i64, + ts: Timestamp, sender: &Sender, ) -> eyre::Result<()> { - let conn = pool.read().await?; - - block_in_place(|| { - match known_version { - KnownDbVersion::Current { - db_version, - last_seq, - ts, - } => { - if seqs_needed.is_empty() { - seqs_needed = vec![(0..=*last_seq)]; - } - - for range_needed in seqs_needed { - let start_seq = range_needed.start(); - let end_seq = range_needed.end(); - - let mut prepped = conn.prepare_cached(r#" - SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl - FROM crsql_changes - WHERE site_id IS ? - AND db_version = ? - AND seq >= ? AND seq <= ? - ORDER BY seq ASC - "#)?; - let site_id: Option<[u8; 16]> = (!is_local) - .then_some(actor_id) - .map(|actor_id| actor_id.to_bytes()); - - let rows = prepped.query_map( - params![site_id, db_version, start_seq, end_seq], - row_to_change, - )?; - - // FIXME: at this point, we don't need to lock anymore, I don't think! - - let chunked = ChunkedChanges::new( - rows, - *start_seq, - *end_seq, - // TODO: make this adaptive based on how long it takes to send changes - MAX_CHANGES_BYTES_PER_MESSAGE, - ); - for changes_seqs in chunked { - match changes_seqs { - Ok((changes, seqs)) => { - debug!(%actor_id, version, "sending fully applied changes {}..={} (requested: {range_needed:?}) (len: {})", seqs.start(), seqs.end(), changes.len()); - tokio::spawn({ - let sender = sender.clone(); - let last_seq = *last_seq; - let ts = *ts; - async move { - if let Outcome::Preempted(_) = sender - .send(SyncMessage::V1(SyncMessageV1::Changeset( - ChangeV1 { - actor_id, - changeset: Changeset::Full { - version, - changes, - seqs, - last_seq, - ts, - }, - }, - ))) - .with_timeout(Duration::from_secs(5)) - .await - { - error!("timed out sending chunk of changes"); - } - } - }); - if sender.is_closed() { - eyre::bail!("sync message sender channel is closed"); - } - } - Err(e) => { - error!("could not process crsql change (db_version: {db_version}) for broadcast: {e}"); - break; - } + let mut seqs_iter = seqs_needed.into_iter(); + while let Some(range_needed) = seqs_iter.by_ref().next() { + match &init_known { + KnownDbVersion::Current { db_version, .. } => { + let bw = booked.blocking_write(); + match bw.get(&version) { + Some(known) => { + // a current version cannot go back to a partial version + if known.is_cleared() { + debug!(%actor_id, version, "in-memory bookkeeping has been cleared, aborting."); + break; } } + None => { + warn!(%actor_id, version, "in-memory bookkeeping vanished, aborting."); + break; + } } + + // this is a read transaction! + let tx = conn.transaction()?; + + let mut prepped = tx.prepare_cached(r#" + SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl + FROM crsql_changes + WHERE site_id IS ? + AND db_version = ? + AND seq >= ? AND seq <= ? + ORDER BY seq ASC + "#)?; + let site_id: Option<[u8; 16]> = (!is_local) + .then_some(actor_id) + .map(|actor_id| actor_id.to_bytes()); + + let start_seq = range_needed.start(); + let end_seq = range_needed.end(); + + let rows = prepped.query_map( + params![site_id, db_version, start_seq, end_seq], + row_to_change, + )?; + + // drop write lock! + drop(bw); + + send_change_chunks( + sender, + ChunkedChanges::new(rows, *start_seq, *end_seq, MAX_CHANGES_BYTES_PER_MESSAGE), + actor_id, + version, + last_seq, + ts, + )?; } - // NOTE: still not sure if this is safe, probably going to disable it. - KnownDbVersion::Partial { seqs, last_seq, ts } => { - // return Ok(()); - debug!("seqs needed: {seqs_needed:?}"); - debug!("seqs we got: {seqs:?}"); - if seqs_needed.is_empty() { - seqs_needed = vec![(0..=*last_seq)]; - } + KnownDbVersion::Partial { seqs, .. } => { + let mut partial_seqs = seqs.clone(); + let mut range_needed = range_needed.clone(); - for range_needed in seqs_needed { - for range in seqs.overlapping(&range_needed) { + let mut last_sent_seq = None; + + 'outer: loop { + let overlapping: Vec> = + partial_seqs.overlapping(&range_needed).cloned().collect(); + + for range in overlapping { // since there can be partial overlap, we need to only // send back the specific range we have or else we risk // sending bad data and creating inconsistencies @@ -500,7 +476,63 @@ async fn process_version( debug!("partial, effective range: {start_seq}..={end_seq}"); - let mut prepped = conn.prepare_cached( + let bw = booked.blocking_write(); + let maybe_db_version = match bw.get(&version) { + Some(known) => match known { + KnownDbVersion::Partial { seqs, .. } => { + if seqs != &partial_seqs { + info!(%actor_id, version, "different partial sequences, updating! range_needed: {range_needed:?}"); + partial_seqs = seqs.clone(); + if let Some(new_start_seq) = last_sent_seq.take() { + range_needed = + (new_start_seq + 1)..=*range_needed.end(); + } + continue 'outer; + } + None + } + KnownDbVersion::Current { db_version, .. } => Some(*db_version), + KnownDbVersion::Cleared => { + debug!(%actor_id, version, "in-memory bookkeeping has been cleared, aborting."); + break; + } + }, + None => { + warn!(%actor_id, version, "in-memory bookkeeping vanished!"); + break; + } + }; + + if let Some(db_version) = maybe_db_version { + info!(%actor_id, version, "switched from partial to current version"); + drop(bw); + let mut seqs_needed: Vec> = seqs_iter.collect(); + if let Some(new_start_seq) = last_sent_seq.take() { + range_needed = (new_start_seq + 1)..=*range_needed.end(); + } + seqs_needed.insert(0, range_needed); + return handle_known_version( + conn, + actor_id, + is_local, + version, + KnownDbVersion::Current { + db_version, + last_seq, + ts, + }, + booked, + seqs_needed, + last_seq, + ts, + sender, + ); + } + + // this is a read transaction! + let tx = conn.transaction()?; + + let mut prepped = tx.prepare_cached( r#" SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl FROM __corro_buffered_changes @@ -517,72 +549,137 @@ async fn process_version( row_to_change, )?; - let chunked = ChunkedChanges::new( - rows, - *start_seq, - *end_seq, - MAX_CHANGES_BYTES_PER_MESSAGE, - ); - for changes_seqs in chunked { - match changes_seqs { - Ok((changes, seqs)) => { - if seqs.end() < seqs.start() { - warn!(%actor_id, version, "UH OH, was going to {}..={} and that's not allowed (end < start). full range: {start_seq}..={end_seq}, last_seq: {last_seq}", seqs.start(), seqs.end()); - return Ok(()); - } - debug!(%actor_id, version, "sending partially buffered changes {}..={} (len: {})", seqs.start(), seqs.end(), changes.len()); - tokio::spawn({ - let sender = sender.clone(); - let last_seq = *last_seq; - let ts = *ts; - async move { - if let Outcome::Preempted(_) = sender - .send(SyncMessage::V1(SyncMessageV1::Changeset( - ChangeV1 { - actor_id, - changeset: Changeset::Full { - version, - changes, - seqs, - last_seq, - ts, - }, - }, - ))) - .with_timeout(Duration::from_secs(2)) - .await - { - error!("timed out sending chunk of changes"); - } - } - }); - if sender.is_closed() { - eyre::bail!("sync message sender channel is closed"); - } - } - Err(e) => { - error!("could not process buffered crsql change (version: {version}) for broadcast: {e}"); - break; - } - } - } + // drop write lock! + drop(bw); + + send_change_chunks( + sender, + ChunkedChanges::new( + rows, + *start_seq, + *end_seq, + MAX_CHANGES_BYTES_PER_MESSAGE, + ), + actor_id, + version, + last_seq, + ts, + )?; + debug!(%actor_id, version, "done sending chunks of partial changes"); + + last_sent_seq = Some(*end_seq); } + break; } } - KnownDbVersion::Cleared => { + KnownDbVersion::Cleared => unreachable!(), + } + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +async fn process_version( + pool: &SplitPool, + actor_id: ActorId, + is_local: bool, + version: i64, + known_version: KnownDbVersion, + booked: &Booked, + mut seqs_needed: Vec>, + sender: &Sender, +) -> eyre::Result<()> { + let mut conn = pool.read().await?; + + let (last_seq, ts) = { + let (last_seq, ts) = match &known_version { + KnownDbVersion::Current { last_seq, ts, .. } => (*last_seq, *ts), + KnownDbVersion::Partial { last_seq, ts, .. } => (*last_seq, *ts), + KnownDbVersion::Cleared => return Ok(()), + }; + if seqs_needed.is_empty() { + seqs_needed = vec![(0..=last_seq)]; + } + + (last_seq, ts) + }; + + block_in_place(|| { + handle_known_version( + &mut conn, + actor_id, + is_local, + version, + known_version, + booked, + seqs_needed, + last_seq, + ts, + sender, + ) + })?; + + trace!("done processing version: {version} for actor_id: {actor_id}"); + + Ok(()) +} + +fn send_change_chunks>>( + sender: &Sender, + mut chunked: ChunkedChanges, + actor_id: ActorId, + version: i64, + last_seq: i64, + ts: Timestamp, +) -> eyre::Result<()> { + let mut max_buf_size = chunked.max_buf_size(); + loop { + if sender.is_closed() { + eyre::bail!("sync message sender channel is closed"); + } + match chunked.next() { + Some(Ok((changes, seqs))) => { + let start = Instant::now(); + sender.blocking_send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { actor_id, - changeset: Changeset::Empty { - versions: version..=version, + changeset: Changeset::Full { + version, + changes, + seqs, + last_seq, + ts, }, })))?; + + let elapsed = start.elapsed(); + + if elapsed > Duration::from_secs(5) { + eyre::bail!("time out: peer is too slow"); + } + + if elapsed > ADAPT_CHUNK_SIZE_THRESHOLD { + if max_buf_size <= MIN_CHANGES_BYTES_PER_MESSAGE { + eyre::bail!("time out: peer is too slow even after reducing throughput"); + } + + max_buf_size /= 2; + debug!("adapting max chunk size to {max_buf_size} bytes"); + + chunked.set_max_buf_size(max_buf_size); + } + } + Some(Err(e)) => { + error!(%actor_id, version, "could not process changes to send via sync: {e}"); + break; + } + None => { + break; } } - Ok(()) - })?; - - trace!("done processing version: {version} for actor_id: {actor_id}"); + } Ok(()) } @@ -623,8 +720,7 @@ async fn process_sync( // 2. process partial needs if let Some(partially_needed) = sync_state.partial_need.get(&actor_id) { for (version, seqs_needed) in partially_needed.iter() { - let bw = booked.write().await; - let known = bw.get(version); + let known = { booked.read().await.get(version).cloned() }; if let Some(known) = known { process_version( &pool, @@ -632,6 +728,7 @@ async fn process_sync( is_local, *version, known, + &booked, seqs_needed.clone(), &sender, ) @@ -799,6 +896,9 @@ pub async fn bidirectional_sync( async move { let mut count = 0; + // changes buffer + let mut buf = Vec::with_capacity(4); + loop { match read_sync_msg(&mut read).await { Ok(None) => { @@ -811,9 +911,7 @@ pub async fn bidirectional_sync( Ok(Some(msg)) => match msg { SyncMessage::V1(SyncMessageV1::Changeset(change)) => { let len = change.len(); - process_single_change(agent, change) - .await - .map_err(SyncRecvError::from)?; + buf.push(change); count += len; } SyncMessage::V1(SyncMessageV1::State(_)) => { @@ -826,8 +924,18 @@ pub async fn bidirectional_sync( } }, } + + if buf.len() == buf.capacity() { + process_multiple_changes(agent, buf.drain(..).collect()) + .await + .map_err(SyncRecvError::from)?; + } } + process_multiple_changes(agent, buf.drain(..).collect()) + .await + .map_err(SyncRecvError::from)?; + debug!(actor_id = %agent.actor_id(), "done reading sync messages"); counter!("corro.sync.changes.recv", count as u64, "actor_id" => their_actor_id.to_string()); diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index 2c256d8e..953bb255 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -44,7 +44,7 @@ pub struct ChunkedChanges { last_pushed_seq: i64, last_start_seq: i64, last_seq: i64, - max_byte_size: usize, + max_buf_size: usize, buffered_size: usize, done: bool, } @@ -53,18 +53,26 @@ impl ChunkedChanges where I: Iterator, { - pub fn new(iter: I, start_seq: i64, last_seq: i64, max_byte_size: usize) -> Self { + pub fn new(iter: I, start_seq: i64, last_seq: i64, max_buf_size: usize) -> Self { Self { iter: iter.peekable(), changes: vec![], last_pushed_seq: 0, last_start_seq: start_seq, last_seq, - max_byte_size, + max_buf_size, buffered_size: 0, done: false, } } + + pub fn max_buf_size(&self) -> usize { + self.max_buf_size + } + + pub fn set_max_buf_size(&mut self, size: usize) { + self.max_buf_size = size; + } } impl Iterator for ChunkedChanges @@ -101,7 +109,7 @@ where break; } - if self.buffered_size >= self.max_byte_size { + if self.buffered_size >= self.max_buf_size { // chunking it up let start_seq = self.last_start_seq; diff --git a/crates/corro-agent/src/transport.rs b/crates/corro-agent/src/transport.rs index 8c52efff..2158d3ad 100644 --- a/crates/corro-agent/src/transport.rs +++ b/crates/corro-agent/src/transport.rs @@ -105,7 +105,7 @@ impl Transport { if let Some(conn) = r.get(&addr).cloned() { if test_conn(&conn) { if let Err(e) = self.0.rtt_tx.try_send((addr, conn.rtt())) { - warn!("could not send RTT for connection through sender: {e}"); + debug!("could not send RTT for connection through sender: {e}"); } return Ok(conn); } @@ -122,7 +122,7 @@ impl Transport { let conn = self.0.endpoint.connect(addr, server_name.as_str())?.await?; if let Err(e) = self.0.rtt_tx.try_send((addr, conn.rtt())) { - warn!("could not send RTT for connection through sender: {e}"); + debug!("could not send RTT for connection through sender: {e}"); } w.insert(addr, conn.clone()); conn diff --git a/crates/corro-types/src/agent.rs b/crates/corro-types/src/agent.rs index 9223e574..c2b6aaf0 100644 --- a/crates/corro-types/src/agent.rs +++ b/crates/corro-types/src/agent.rs @@ -431,19 +431,25 @@ impl<'a> DerefMut for WriteConn<'a> { #[derive(Debug, Clone, Eq, PartialEq)] pub enum KnownDbVersion { - Current { - db_version: i64, + Partial { + seqs: RangeInclusiveSet, last_seq: i64, ts: Timestamp, }, - Partial { - seqs: RangeInclusiveSet, + Current { + db_version: i64, last_seq: i64, ts: Timestamp, }, Cleared, } +impl KnownDbVersion { + pub fn is_cleared(&self) -> bool { + matches!(self, KnownDbVersion::Cleared) + } +} + pub type BookedVersions = RangeInclusiveMap; pub type BookedInner = Arc>; diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index 163892bd..b7e5c7c2 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -95,6 +95,18 @@ pub enum Changeset { }, } +impl From for Changeset { + fn from(value: ChangesetParts) -> Self { + Changeset::Full { + version: value.version, + changes: value.changes, + seqs: value.seqs, + last_seq: value.last_seq, + ts: value.ts, + } + } +} + pub struct ChangesetParts { pub version: i64, pub changes: Vec,