Skip to content

Commit

Permalink
Implement priority broadcasts for ring0 nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
spacekookie committed Mar 5, 2024
1 parent e9b9d75 commit b3e7ec5
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 12 deletions.
2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn spawn_incoming_connection_handlers(

// Spawn handler tasks for this connection
spawn_foca_handler(&agent, &tripwire, &conn);
uni::spawn_unipayload_handler(&tripwire, &conn, agent.clone());
uni::spawn_unipayload_handler(agent.clone(), bookie.clone(), &tripwire, &conn);
bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn);
});
}
Expand Down
53 changes: 42 additions & 11 deletions crates/corro-agent/src/agent/uni.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::agent::util::process_multiple_changes;
use corro_types::{
agent::Agent,
agent::{Agent, Bookie},
broadcast::{BroadcastV1, ChangeSource, UniPayload, UniPayloadV1},
};
use metrics::counter;
Expand All @@ -11,7 +12,12 @@ use tripwire::Tripwire;

/// Spawn a task that accepts unidirectional broadcast streams, then
/// spawns another task for each incoming stream to handle.
pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, agent: Agent) {
pub fn spawn_unipayload_handler(
agent: Agent,
bookie: Bookie,
tripwire: &Tripwire,
conn: &quinn::Connection,
) {
tokio::spawn({
let conn = conn.clone();
let mut tripwire = tripwire.clone();
Expand Down Expand Up @@ -40,6 +46,7 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a

tokio::spawn({
let agent = agent.clone();
let bookie = bookie.clone();
async move {
let mut framed = FramedRead::new(rx, LengthDelimitedCodec::new());

Expand All @@ -59,19 +66,43 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a
change,
)),
cluster_id,
priority,
} => {
if cluster_id != agent.cluster_id() {
continue;
}
if let Err(e) = agent
.tx_changes()
.send((change, ChangeSource::Broadcast))
.await
{
error!(
"could not send change for processing: {e}"
);
return;

if priority {
let agent = agent.clone();
let bookie = bookie.clone();

tokio::spawn(async move {
if let Err(e) =
process_multiple_changes(
agent,
bookie,
vec![(
change,
ChangeSource::Broadcast,
std::time::Instant::now(),
)],
)
.await
{
error!("Process priority change failed: {:?}", e);
}
});
} else {
if let Err(e) = agent
.tx_changes()
.send((change, ChangeSource::Broadcast))
.await
{
error!(
"could not send change for processing: {e}"
);
return;
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ pub fn runtime_loop(
if let Err(e) = (UniPayload::V1 {
data: UniPayloadV1::Broadcast(bcast.clone()),
cluster_id: agent.cluster_id(),
priority: is_local,
})
.write_to_stream((&mut ser_buf).writer())
{
Expand Down
2 changes: 2 additions & 0 deletions crates/corro-types/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub enum UniPayload {
data: UniPayloadV1,
#[speedy(default_on_eof)]
cluster_id: ClusterId,
#[speedy(default_on_eof)]
priority: bool,
},
}

Expand Down

0 comments on commit b3e7ec5

Please sign in to comment.