Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function to handle subscriptions when applying buffered changes #214

Merged
merged 8 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ pub async fn configurable_stress_test(
}
if v.len() == agents.len()
&& v.iter()
.all(|(n, needed)| *n == changes_count as i64 && *needed == 0)
.all(|(n, needed)| *n == changes_count && *needed == 0)
{
break;
}
Expand Down
236 changes: 126 additions & 110 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,62 +505,63 @@ pub async fn process_fully_buffered_changes(
actor_id: ActorId,
version: Version,
) -> Result<bool, ChangeError> {
let mut conn = agent.pool().write_normal().await?;
debug!(%actor_id, %version, "acquired write (normal) connection to process fully buffered changes");
let db_version = {
let mut conn = agent.pool().write_normal().await?;
debug!(%actor_id, %version, "acquired write (normal) connection to process fully buffered changes");

let booked = {
bookie
.write(format!(
"process_fully_buffered(ensure):{}",
actor_id.as_simple()
))
.await
.ensure(actor_id)
};

let booked = {
bookie
let mut bookedw = booked
.write(format!(
"process_fully_buffered(ensure):{}",
"process_fully_buffered(booked writer):{}",
actor_id.as_simple()
))
.await
.ensure(actor_id)
};

let mut bookedw = booked
.write(format!(
"process_fully_buffered(booked writer):{}",
actor_id.as_simple()
))
.await;
debug!(%actor_id, %version, "acquired Booked write lock to process fully buffered changes");

let inserted = block_in_place(|| {
let (last_seq, ts) = {
match bookedw.partials.get(&version) {
Some(PartialVersion { seqs, last_seq, ts }) => {
if seqs.gaps(&(CrsqlSeq(0)..=*last_seq)).count() != 0 {
error!(%actor_id, %version, "found sequence gaps: {:?}, aborting!", seqs.gaps(&(CrsqlSeq(0)..=*last_seq)).collect::<RangeInclusiveSet<CrsqlSeq>>());
// TODO: return an error here
return Ok(false);
.await;
debug!(%actor_id, %version, "acquired Booked write lock to process fully buffered changes");

block_in_place(|| {
let (last_seq, ts) = {
match bookedw.partials.get(&version) {
Some(PartialVersion { seqs, last_seq, ts }) => {
if seqs.gaps(&(CrsqlSeq(0)..=*last_seq)).count() != 0 {
error!(%actor_id, %version, "found sequence gaps: {:?}, aborting!", seqs.gaps(&(CrsqlSeq(0)..=*last_seq)).collect::<RangeInclusiveSet<CrsqlSeq>>());
// TODO: return an error here
return Ok(None);
}
(*last_seq, *ts)
}
None => {
warn!(%actor_id, %version, "version not found in cache, returning");
return Ok(None);
}
(*last_seq, *ts)
}
None => {
warn!(%actor_id, %version, "version not found in cache, returning");
return Ok(false);
}
}
};
};

let tx = conn
.immediate_transaction()
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;
let tx = conn
.immediate_transaction()
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;

info!(%actor_id, %version, "Processing buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})");
info!(%actor_id, %version, "Processing buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})");

let max_db_version: Option<Option<CrsqlDbVersion>> = tx.prepare_cached("SELECT MAX(db_version) FROM __corro_buffered_changes WHERE site_id = ? AND version = ?").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?.query_row(params![actor_id.as_bytes(), version], |row| row.get(0)).optional().map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?;
let max_db_version: Option<Option<CrsqlDbVersion>> = tx.prepare_cached("SELECT MAX(db_version) FROM __corro_buffered_changes WHERE site_id = ? AND version = ?").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?.query_row(params![actor_id.as_bytes(), version], |row| row.get(0)).optional().map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?;

let start = Instant::now();
let start = Instant::now();

if let Some(max_db_version) = max_db_version.flatten() {
// insert all buffered changes into crsql_changes directly from the buffered changes table
let count = tx
if let Some(max_db_version) = max_db_version.flatten() {
// insert all buffered changes into crsql_changes directly from the buffered changes table
let count = tx
.prepare_cached(
r#"
INSERT INTO crsql_changes ("table", pk, cid, val, col_version, db_version, site_id, cl, seq)
Expand All @@ -572,42 +573,42 @@ pub async fn process_fully_buffered_changes(
"#,
).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?
.execute(params![max_db_version, actor_id.as_bytes(), version]).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?;
info!(%actor_id, %version, "Inserted {count} rows from buffered into crsql_changes in {:?}", start.elapsed());
} else {
info!(%actor_id, %version, "No buffered rows, skipped insertion into crsql_changes");
}

if let Err(e) = agent.tx_clear_buf().try_send((actor_id, version..=version)) {
error!("could not schedule buffered data clear: {e}");
}

let rows_impacted: i64 = tx
.prepare_cached("SELECT crsql_rows_impacted()")
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?
.query_row((), |row| row.get(0))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;
info!(%actor_id, %version, "Inserted {count} rows from buffered into crsql_changes in {:?}", start.elapsed());
} else {
info!(%actor_id, %version, "No buffered rows, skipped insertion into crsql_changes");
}

debug!(%actor_id, %version, "rows impacted by buffered changes insertion: {rows_impacted}");
if let Err(e) = agent.tx_clear_buf().try_send((actor_id, version..=version)) {
error!("could not schedule buffered data clear: {e}");
}

if rows_impacted > 0 {
let db_version: CrsqlDbVersion = tx
.query_row("SELECT crsql_next_db_version()", [], |row| row.get(0))
let rows_impacted: i64 = tx
.prepare_cached("SELECT crsql_rows_impacted()")
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?
.query_row((), |row| row.get(0))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;
debug!("db version: {db_version}");

tx.prepare_cached(
debug!(%actor_id, %version, "rows impacted by buffered changes insertion: {rows_impacted}");

let db_version = if rows_impacted > 0 {
let db_version: CrsqlDbVersion = tx
.query_row("SELECT crsql_next_db_version()", [], |row| row.get(0))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;
debug!("db version: {db_version}");

tx.prepare_cached(
"
INSERT OR IGNORE INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts)
VALUES (
Expand All @@ -617,55 +618,70 @@ pub async fn process_fully_buffered_changes(
:last_seq,
:ts
);",
).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?
.execute(named_params! {
":actor_id": actor_id,
":version": version,
":db_version": db_version,
":last_seq": last_seq,
":ts": ts
}).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?;
).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?
.execute(named_params! {
":actor_id": actor_id,
":version": version,
":db_version": db_version,
":last_seq": last_seq,
":ts": ts
}).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?;

debug!(%actor_id, %version, "inserted bookkeeping row after buffered insert");
Some(db_version)
} else {
store_empty_changeset(&tx, actor_id, version..=version)?;

debug!(%actor_id, %version, "inserted CLEARED bookkeeping row after buffered insert");
None
};

debug!(%actor_id, %version, "inserted bookkeeping row after buffered insert");
} else {
store_empty_changeset(&tx, actor_id, version..=version)?;
let mut snap = bookedw.snapshot();
snap.insert_db(&tx, [version..=version].into())
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;

debug!(%actor_id, %version, "inserted CLEARED bookkeeping row after buffered insert");
};
let overwritten =
find_overwritten_versions(&tx).map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;

let mut snap = bookedw.snapshot();
snap.insert_db(&tx, [version..=version].into())
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;
for (actor_id, versions_set) in overwritten {
for versions in versions_set {
store_empty_changeset(&tx, actor_id, versions)?;
}
}

let overwritten =
find_overwritten_versions(&tx).map_err(|source| ChangeError::Rusqlite {
tx.commit().map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;

for (actor_id, versions_set) in overwritten {
for versions in versions_set {
store_empty_changeset(&tx, actor_id, versions)?;
}
}

tx.commit().map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;
bookedw.commit_snapshot(snap);

bookedw.commit_snapshot(snap);
Ok::<_, ChangeError>(db_version)
})
}?;

Ok::<_, ChangeError>(true)
})?;
if let Some(db_version) = db_version {
let conn = agent.pool().read().await?;
block_in_place(|| {
if let Err(e) = agent
.subs_manager()
.match_changes_from_db_version(&conn, db_version)
{
error!(%db_version, "could not match changes from db version: {e}");
}
});
}

Ok(inserted)
Ok(db_version.is_some())
}

#[tracing::instrument(skip(agent, bookie, changes), err)]
Expand Down
Loading
Loading