From 5af182a1bf2c901f3aa0dece7f172c7e18a9fa9b Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 5 Jul 2024 15:04:28 +0100 Subject: [PATCH] calculate cost bettre --- crates/corro-agent/src/agent/handlers.rs | 8 ++++++-- crates/corro-types/src/agent.rs | 16 +++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 4757af19..32e5aa1d 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -431,7 +431,8 @@ pub async fn handle_emptyset( Some(change) => { if let Changeset::EmptySet { versions, ts } = change.changeset { buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts)); - cost += versions.len(); + cost += versions.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::(); + println!("cost - {cost}"); } else { warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id); } @@ -451,12 +452,14 @@ pub async fn handle_emptyset( } if process { + for (actor, changes) in &mut buf { while !changes.is_empty() { let change = changes.pop_front().unwrap(); match process_emptyset(agent.clone(), bookie.clone(), *actor, &change).await { Ok(()) => { - cost -= change.0.len(); + // cost -= change.0.len(); + cost -= change.0.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::(); } Err(e) => { warn!("encountered error when processing emptyset - {e}"); @@ -467,6 +470,7 @@ pub async fn handle_emptyset( } } } + } println!("shutting down handle empties loop"); diff --git a/crates/corro-types/src/agent.rs b/crates/corro-types/src/agent.rs index d07ccfe7..b74e0252 100644 --- a/crates/corro-types/src/agent.rs +++ b/crates/corro-types/src/agent.rs @@ -255,7 +255,6 @@ pub fn migrate(clock: Arc, conn: &mut Connection) -> rusqlite::Result Box::new(crsqlite_v0_16_migration as fn(&Transaction) -> rusqlite::Result<()>), Box::new(create_bookkeeping_gaps as fn(&Transaction) -> rusqlite::Result<()>), Box::new(create_impacted_versions as fn(&Transaction) -> rusqlite::Result<()>), - Box::new(create_ts_index_bookkeeping_table), Box::new(create_sync_state(clock)), ]; @@ -360,21 +359,20 @@ fn refactor_corro_members(tx: &Transaction) -> rusqlite::Result<()> { ) } -fn create_ts_index_bookkeeping_table(tx: &Transaction) -> rusqlite::Result<()> { - tx.execute_batch( - r#" +fn create_sync_state(clock: Arc) -> impl Fn(&Transaction) -> rusqlite::Result<()> { + let ts = Timestamp::from(clock.new_timestamp()); + + move |tx: &Transaction| -> rusqlite::Result<()> { + tx.execute_batch( + r#" CREATE INDEX index__corro_bookkeeping_ts ON __corro_bookkeeping (actor_id, ts ASC); CREATE TABLE __corro_sync_state ( actor_id BLOB PRIMARY KEY NOT NULL, last_cleared_ts TEXT ); "#, - ) -} -fn create_sync_state(clock: Arc) -> impl Fn(&Transaction) -> rusqlite::Result<()> { - let ts = Timestamp::from(clock.new_timestamp()); + )?; - move |tx: &Transaction| -> rusqlite::Result<()> { tx.execute( r#" UPDATE __corro_bookkeeping SET ts = ?