Skip to content

Commit

Permalink
calculate cost bettre
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Jul 5, 2024
1 parent 4163219 commit 5af182a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
8 changes: 6 additions & 2 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ pub async fn handle_emptyset(
Some(change) => {
if let Changeset::EmptySet { versions, ts } = change.changeset {
buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts));
cost += versions.len();
cost += versions.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::<usize>();
println!("cost - {cost}");
} else {
warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id);
}
Expand All @@ -451,12 +452,14 @@ pub async fn handle_emptyset(
}

if process {

for (actor, changes) in &mut buf {
while !changes.is_empty() {
let change = changes.pop_front().unwrap();
match process_emptyset(agent.clone(), bookie.clone(), *actor, &change).await {
Ok(()) => {
cost -= change.0.len();
// cost -= change.0.len();
cost -= change.0.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::<usize>();
}
Err(e) => {
warn!("encountered error when processing emptyset - {e}");
Expand All @@ -467,6 +470,7 @@ pub async fn handle_emptyset(
}
}
}

}

println!("shutting down handle empties loop");
Expand Down
16 changes: 7 additions & 9 deletions crates/corro-types/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ pub fn migrate(clock: Arc<uhlc::HLC>, conn: &mut Connection) -> rusqlite::Result
Box::new(crsqlite_v0_16_migration as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(create_bookkeeping_gaps as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(create_impacted_versions as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(create_ts_index_bookkeeping_table),
Box::new(create_sync_state(clock)),
];

Expand Down Expand Up @@ -360,21 +359,20 @@ fn refactor_corro_members(tx: &Transaction) -> rusqlite::Result<()> {
)
}

fn create_ts_index_bookkeeping_table(tx: &Transaction) -> rusqlite::Result<()> {
tx.execute_batch(
r#"
fn create_sync_state(clock: Arc<uhlc::HLC>) -> impl Fn(&Transaction) -> rusqlite::Result<()> {
let ts = Timestamp::from(clock.new_timestamp());

move |tx: &Transaction| -> rusqlite::Result<()> {
tx.execute_batch(
r#"
CREATE INDEX index__corro_bookkeeping_ts ON __corro_bookkeeping (actor_id, ts ASC);
CREATE TABLE __corro_sync_state (
actor_id BLOB PRIMARY KEY NOT NULL,
last_cleared_ts TEXT
);
"#,
)
}
fn create_sync_state(clock: Arc<uhlc::HLC>) -> impl Fn(&Transaction) -> rusqlite::Result<()> {
let ts = Timestamp::from(clock.new_timestamp());
)?;

move |tx: &Transaction| -> rusqlite::Result<()> {
tx.execute(
r#"
UPDATE __corro_bookkeeping SET ts = ?
Expand Down

0 comments on commit 5af182a

Please sign in to comment.