From 9fe3c300779b8e07eaa00fdb6182f920473f32bd Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 5 Jul 2024 15:39:09 +0100 Subject: [PATCH] don't process emptyset --- crates/corro-agent/src/agent/handlers.rs | 4 +- crates/corro-agent/src/agent/run_root.rs | 14 ++--- crates/corro-agent/src/agent/tests.rs | 78 ++++++++++++------------ crates/corro-agent/src/api/peer.rs | 16 ++--- 4 files changed, 57 insertions(+), 55 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 4757af19..f5736313 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -408,7 +408,8 @@ fn processing_cost(change: &Changeset) -> usize { } /// Handle incoming emptyset received during syncs -/// +///_ +#[allow(dead_code)] pub async fn handle_emptyset( agent: Agent, bookie: Bookie, @@ -472,6 +473,7 @@ pub async fn handle_emptyset( println!("shutting down handle empties loop"); } +#[allow(dead_code)] pub async fn process_emptyset( agent: Agent, bookie: Bookie, diff --git a/crates/corro-agent/src/agent/run_root.rs b/crates/corro-agent/src/agent/run_root.rs index 470ef1d1..ce43799c 100644 --- a/crates/corro-agent/src/agent/run_root.rs +++ b/crates/corro-agent/src/agent/run_root.rs @@ -45,7 +45,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul rx_apply, rx_clear_buf, rx_changes, - rx_emptyset, + rx_emptyset: _, rx_foca, subs_manager, subs_bcast_cache, @@ -218,12 +218,12 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul tripwire.clone(), )); - spawn_counted(handlers::handle_emptyset( - agent.clone(), - bookie.clone(), - rx_emptyset, - tripwire.clone(), - )); + // spawn_counted(handlers::handle_emptyset( + // agent.clone(), + // bookie.clone(), + // rx_emptyset, + // tripwire.clone(), + // )); Ok(bookie) } diff --git a/crates/corro-agent/src/agent/tests.rs b/crates/corro-agent/src/agent/tests.rs index 86e1c640..f3585e65 100644 --- a/crates/corro-agent/src/agent/tests.rs +++ b/crates/corro-agent/src/agent/tests.rs @@ -845,8 +845,8 @@ async fn test_clear_empty_versions() -> eyre::Result<()> { println!("got last cleared - {last_cleared:?}"); - // initiate sync with ta1 to get cleared - let res = parallel_sync( + // // initiate sync with ta1 to get cleared + let _ = parallel_sync( &ta2.agent, &ta2_transport, vec![(ta1.agent.actor_id(), ta1.agent.gossip_addr())], @@ -854,43 +854,43 @@ async fn test_clear_empty_versions() -> eyre::Result<()> { last_cleared, ) .await?; - - println!("ta2 synced {res}"); - - sleep(Duration::from_secs(2)).await; - - check_bookie_versions( - ta2.clone(), - ta1.agent.actor_id(), - vec![], - vec![], - vec![], - vec![ - Version(1)..=Version(5), - Version(10)..=Version(10), - Version(23)..=Version(25), - Version(30)..=Version(31), - ], - ) - .await?; - - // ta2 should have ta1's last cleared - let ta1_cleared = ta1 - .agent - .booked() - .read("test_clear_empty") - .await - .last_cleared_ts(); - let ta2_ta1_cleared = ta2 - .bookie - .write("test") - .await - .ensure(ta1.agent.actor_id()) - .read("test_clear_empty") - .await - .last_cleared_ts(); - - assert_eq!(ta1_cleared, ta2_ta1_cleared); + // + // println!("ta2 synced {res}"); + // + // sleep(Duration::from_secs(2)).await; + // + // check_bookie_versions( + // ta2.clone(), + // ta1.agent.actor_id(), + // vec![], + // vec![], + // vec![], + // vec![ + // Version(1)..=Version(5), + // Version(10)..=Version(10), + // Version(23)..=Version(25), + // Version(30)..=Version(31), + // ], + // ) + // .await?; + // + // // ta2 should have ta1's last cleared + // let ta1_cleared = ta1 + // .agent + // .booked() + // .read("test_clear_empty") + // .await + // .last_cleared_ts(); + // let ta2_ta1_cleared = ta2 + // .bookie + // .write("test") + // .await + // .ensure(ta1.agent.actor_id()) + // .read("test_clear_empty") + // .await + // .last_cleared_ts(); + // + // assert_eq!(ta1_cleared, ta2_ta1_cleared); tripwire_tx.send(()).await.ok(); tripwire_worker.await; diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 0d537e27..9c4983a4 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1374,7 +1374,7 @@ pub async fn parallel_sync( let counts = FuturesUnordered::from_iter(readers.into_iter().map(|(actor_id, mut read)| { let tx_changes = agent.tx_changes().clone(); - let tx_emptyset = agent.tx_emptyset().clone(); + // let tx_emptyset = agent.tx_emptyset().clone(); async move { let mut count = 0; @@ -1402,13 +1402,13 @@ pub async fn parallel_sync( change.seqs() ); // only accept emptyset that's from the same node that's syncing - if change.is_empty_set() { - tx_emptyset - .send(change) - .await - .map_err(|_| SyncRecvError::ChangesChannelClosed)?; - continue; - } + // if change.is_empty_set() { + // tx_emptyset + // .send(change) + // .await + // .map_err(|_| SyncRecvError::ChangesChannelClosed)?; + // continue; + // } tx_changes .send((change, ChangeSource::Sync))