Skip to content

Commit

Permalink
test order of broadcast and sync changes
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Nov 6, 2024
1 parent a9e861d commit ce51743
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 235 deletions.
2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub fn spawn_incoming_connection_handlers(

// Spawn handler tasks for this connection
spawn_foca_handler(&agent, &tripwire, &conn);
uni::spawn_unipayload_handler(&tripwire, &conn, agent.clone());
uni::spawn_unipayload_handler(&tripwire, &conn, agent.cluster_id(), agent.tx_changes().clone());
bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn);
});
}
Expand Down
1 change: 1 addition & 0 deletions crates/corro-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use error::{SyncClientError, SyncRecvError};
pub use run_root::start_with_config;
pub use setup::{setup, AgentOptions};
pub use util::process_multiple_changes;
pub use uni::spawn_unipayload_handler;

pub const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300);
#[cfg(test)]
Expand Down
36 changes: 20 additions & 16 deletions crates/corro-agent/src/agent/uni.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use corro_types::{
agent::Agent,
broadcast::{BroadcastV1, ChangeSource, UniPayload, UniPayloadV1},
actor::ClusterId, broadcast::{BroadcastV1, ChangeSource, ChangeV1, UniPayload, UniPayloadV1}, channel::CorroSender
};
use metrics::counter;
use speedy::Readable;
Expand All @@ -11,7 +10,7 @@ use tripwire::Tripwire;

/// Spawn a task that accepts unidirectional broadcast streams, then
/// spawns another task for each incoming stream to handle.
pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, agent: Agent) {
pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, cluster_id: ClusterId, tx_changes: CorroSender<(ChangeV1, ChangeSource)>) {
tokio::spawn({
let conn = conn.clone();
let mut tripwire = tripwire.clone();
Expand Down Expand Up @@ -39,7 +38,7 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a
);

tokio::spawn({
let agent = agent.clone();
let tx_changes = tx_changes.clone();
async move {
let mut framed = FramedRead::new(
rx,
Expand All @@ -48,6 +47,7 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a
.new_codec(),
);

let mut changes = vec![];
loop {
match StreamExt::next(&mut framed).await {
Some(Ok(b)) => {
Expand All @@ -57,27 +57,19 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a
Ok(payload) => {
trace!("parsed a payload: {payload:?}");


match payload {
UniPayload::V1 {
data:
UniPayloadV1::Broadcast(BroadcastV1::Change(
change,
)),
cluster_id,
cluster_id: payload_cluster_id,
} => {
if cluster_id != agent.cluster_id() {
if cluster_id != payload_cluster_id {
continue;
}
if let Err(e) = agent
.tx_changes()
.send((change, ChangeSource::Broadcast))
.await
{
error!(
"could not send change for processing: {e}"
);
return;
}
changes.push((change, ChangeSource::Broadcast));
}
}
}
Expand All @@ -93,6 +85,18 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a
None => break,
}
}

for change in changes.into_iter().rev() {
if let Err(e) = tx_changes
.send(change)
.await
{
error!(
"could not send change for processing: {e}"
);
return;
}
}
}
});
}
Expand Down
60 changes: 60 additions & 0 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1717,9 +1717,12 @@ pub async fn serve_sync(

#[cfg(test)]
mod tests {
use crate::api::public::api_v1_transactions;
use axum::{Extension, Json};
use camino::Utf8PathBuf;
use corro_tests::launch_test_agent;
use corro_tests::TEST_SCHEMA;
use corro_types::api::Statement;
use corro_types::{
api::{ColumnName, TableName},
base::CrsqlDbVersion,
Expand All @@ -1739,6 +1742,63 @@ mod tests {

use super::*;

#[tokio::test(flavor = "multi_thread")]
async fn test_sync_changes_order() -> eyre::Result<()> {
_ = tracing_subscriber::fmt::try_init();

let (tripwire, _tripwire_worker, _tripwire_tx) = Tripwire::new_simple();

let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;

// create versions on the first node
let versions_range = 1..=100;
for i in versions_range.clone() {
let (status_code, body) = api_v1_transactions(
Extension(ta1.agent.clone()),
axum::Json(vec![Statement::WithParams(
"INSERT OR REPLACE INTO testsblob (id,text) VALUES (?,?)".into(),
vec![format!("service-id-{i}").into(), "service-name".into()],
)]),
)
.await;
assert_eq!(status_code, StatusCode::OK);

let version = body.0.version.unwrap();
assert_eq!(version, Version(i));
}

let dir = tempfile::tempdir()?;

let (ta2_agent, mut ta2_opts) = setup(
Config::builder()
.db_path(dir.path().join("corrosion.db").display().to_string())
.gossip_addr("127.0.0.1:0".parse()?)
.api_addr("127.0.0.1:0".parse()?)
.build()?,
tripwire,
)
.await?;

let members = vec![(ta1.agent.actor_id(), ta1.agent.gossip_addr())];
let _ = parallel_sync(
&ta2_agent,
&ta2_opts.transport,
members,
Default::default(),
HashMap::new(),
)
.await?;

for i in versions_range.rev() {
let changes = tokio::time::timeout(Duration::from_secs(5), ta2_opts.rx_changes.recv())
.await?
.unwrap();
assert_eq!(changes.0.versions(), Version(i)..=Version(i));
}

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_handle_need() -> eyre::Result<()> {
_ = tracing_subscriber::fmt::try_init();
Expand Down
Loading

0 comments on commit ce51743

Please sign in to comment.