Skip to content

Commit

Permalink
Cleanup logs and re-add auto cluster rejoin with a randomish delay
Browse files Browse the repository at this point in the history
  • Loading branch information
spacekookie committed Mar 5, 2024
1 parent 5c9763a commit c4fc8c8
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 13 deletions.
10 changes: 8 additions & 2 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ pub async fn handle_notifications(
match notification {
Notification::MemberUp(actor) => {
let member_added_res = agent.members().write().add_member(&actor);
trace!("Member Up {actor:?} (result: {member_added_res:?})");
info!("Member Up {actor:?} (result: {member_added_res:?})");

match member_added_res {
MemberAddedResult::NewMember => {
Expand All @@ -285,6 +285,10 @@ pub async fn handle_notifications(
// anything else to do here?
}
MemberAddedResult::Ignored => {
// TODO: it's unclear if this is needed or
// not. We removed it to debug a foca member
// state issue. It may be needed again.

// if let Err(e) = agent
// .tx_foca()
// .send(FocaInput::ApplyMany(vec![foca::Member::new(
Expand All @@ -302,7 +306,7 @@ pub async fn handle_notifications(
}
Notification::MemberDown(actor) => {
let removed = { agent.members().write().remove_member(&actor) };
trace!("Member Down {actor:?} (removed: {removed})");
info!("Member Down {actor:?} (removed: {removed})");
if removed {
debug!("Member Down {actor:?}");
counter!("corro.gossip.member.removed", "id" => actor.id().0.to_string(), "addr" => actor.addr().to_string()).increment(1);
Expand Down Expand Up @@ -645,6 +649,7 @@ pub async fn handle_sync(
debug!("found {} candidates to synchronize with", candidates.len());

let desired_count = cmp::max(cmp::min(candidates.len() / 100, 10), 3);
debug!("Selected {desired_count} nodes to sync with");

let mut rng = StdRng::from_entropy();

Expand All @@ -668,6 +673,7 @@ pub async fn handle_sync(
.collect()
};

trace!("Sync set: {chosen:?}");
if chosen.is_empty() {
return Ok(());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/uni.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use metrics::counter;
use speedy::Readable;
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use tracing::{debug, error, trace};
use tracing::{debug, error, trace, warn};
use tripwire::Tripwire;

/// Spawn a task that accepts unidirectional broadcast streams, then
Expand Down
5 changes: 4 additions & 1 deletion crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ pub async fn initialise_foca(agent: &Agent) {

let agent = agent.clone();
tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_secs(60)).await;
// Add some random scatter to the task sleep so that
// restarted nodes don't all rejoin at once
let scatter = rand::random::<u64>() % 15;
tokio::time::sleep(Duration::from_secs(25 + scatter)).await;

async fn apply_rejoin(agent: &Agent) -> eyre::Result<()> {
let (cb_tx, cb_rx) = tokio::sync::oneshot::channel();
Expand Down
20 changes: 12 additions & 8 deletions crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ pub fn runtime_loop(
}
FocaInput::Cmd(cmd) => match cmd {
FocaCmd::Rejoin(callback) => {
if callback
.send(foca.change_identity(
foca.identity().renew().unwrap(),
&mut runtime,
))
.is_err()
{
let renewed = foca.identity().renew().unwrap();
debug!("handling FocaInput::Rejoin {renewed:?}");

let new_id = foca.change_identity(renewed, &mut runtime);
info!("New identity: {new_id:?}");

if callback.send(new_id).is_err() {
warn!("could not send back result after rejoining cluster");
}
}
Expand Down Expand Up @@ -735,7 +735,11 @@ fn diff_member_states(

fn make_foca_config(cluster_size: NonZeroU32) -> foca::Config {
let mut config = foca::Config::new_wan(cluster_size);
config.remove_down_after = Duration::from_secs(2 * 24 * 60 * 60);
// NOTE: previously this value was set to 2 * 24 * 3600 (48h) due
// to a previous cluster membership issue. We may want to change
// it back if we are sure that the new issues are unrelated or if
// this change causes other problems.
config.remove_down_after = Duration::from_secs(60);

// max payload size for udp datagrams, use a safe value here...
// TODO: calculate from smallest max datagram size for all QUIC conns
Expand Down
1 change: 0 additions & 1 deletion crates/corro-agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ pub mod agent;
pub mod api;
pub mod broadcast;
pub mod transport;

0 comments on commit c4fc8c8

Please sign in to comment.