Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(more) Gracefully leave SWIM cluster #36

Merged
merged 9 commits into from
Aug 24, 2023
9 changes: 6 additions & 3 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,11 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
.and_then(|rows| rows.collect::<rusqlite::Result<Vec<String>>>())
{
Ok(foca_states) => {
foca_states.iter().filter_map(|state| match serde_json::from_str(state.as_str()) {
Ok(fs) => Some(fs),
foca_states.iter().filter_map(|state| match serde_json::from_str::<foca::Member<Actor>>(state.as_str()) {
Ok(fs) => match fs.state() {
foca::State::Suspect => None,
_ => Some(fs)
},
Err(e) => {
error!("could not deserialize foca member state: {e} (json: {state})");
None
Expand Down Expand Up @@ -1080,7 +1083,7 @@ async fn handle_notifications(
let added = { agent.members().write().add_member(&actor) };
debug!("Member Up {actor:?} (added: {added})");
if added {
debug!("Member Up {actor:?}");
info!("Member Up {actor:?}");
increment_counter!("corro.gossip.member.added", "id" => actor.id().0.to_string(), "addr" => actor.addr().to_string());
// actually added a member
// notify of new cluster size
Expand Down
8 changes: 4 additions & 4 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ fn build_quinn_transport_config(config: &GossipConfig) -> quinn::TransportConfig
.unwrap(),
));

// max 1024 concurrent bidirectional streams
transport_config.max_concurrent_bidi_streams(1024u32.into());
// max concurrent bidirectional streams
transport_config.max_concurrent_bidi_streams(32u32.into());

// max 10240 concurrent unidirectional streams
transport_config.max_concurrent_uni_streams(10240u32.into());
// max concurrent unidirectional streams
transport_config.max_concurrent_uni_streams(512u32.into());

if let Some(max_mtu) = config.max_mtu {
info!("Setting maximum MTU for QUIC at {max_mtu}");
Expand Down
39 changes: 25 additions & 14 deletions crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashMap,
net::SocketAddr,
num::NonZeroU32,
pin::Pin,
Expand All @@ -16,6 +17,7 @@ use metrics::{gauge, histogram};
use parking_lot::RwLock;
use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng};
use rusqlite::params;
use spawn::spawn_counted;
use speedy::Writable;
use strum::EnumDiscriminants;
use tokio::{
Expand Down Expand Up @@ -132,7 +134,7 @@ pub fn runtime_loop(

// foca SWIM operations loop.
// NOTE: every turn of that loop should be fast or else we risk being suspected of being down
tokio::spawn({
spawn_counted({
let config = config.clone();
let agent = agent.clone();
let mut tripwire = tripwire.clone();
Expand All @@ -142,7 +144,7 @@ pub fn runtime_loop(

let member_events_chunks =
tokio_stream::wrappers::BroadcastStream::new(member_events.resubscribe())
.chunks_timeout(100, Duration::from_secs(30));
.chunks_timeout(1000, Duration::from_secs(2));
tokio::pin!(member_events_chunks);

#[derive(EnumDiscriminants)]
Expand Down Expand Up @@ -283,6 +285,9 @@ pub fn runtime_loop(
}
}

// extra time for leave message to propagate
tokio::time::sleep(Duration::from_secs(2)).await;

break;
}
Branch::Foca(input) => match input {
Expand Down Expand Up @@ -331,8 +336,12 @@ pub fn runtime_loop(
let splitted: Vec<_> = evts
.iter()
.flatten()
.filter_map(|evt| {
let actor = evt.actor();
.fold(HashMap::new(), |mut acc, evt| {
acc.insert(evt.actor(), evt.as_str());
acc
})
.into_iter()
.filter_map(|(actor, evt)| {
let foca_state = {
// need to bind this...
let foca_state = foca
Expand All @@ -350,9 +359,8 @@ pub fn runtime_loop(
foca_state
};

foca_state.map(|foca_state| {
(actor.id(), actor.addr(), evt.as_str(), foca_state)
})
foca_state
.map(|foca_state| (actor.id(), actor.addr(), evt, foca_state))
})
.collect();

Expand All @@ -372,22 +380,25 @@ pub fn runtime_loop(
let tx = conn.transaction()?;

for (id, address, state, foca_state) in splitted {
tx.prepare_cached(
"
INSERT INTO __corro_members (id, address, state, foca_state)
trace!(
"updating {id} {address} as {state} w/ state: {foca_state:?}",
);
let upserted = tx.prepare_cached("INSERT INTO __corro_members (id, address, state, foca_state)
VALUES (?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
address = excluded.address,
state = excluded.state,
foca_state = excluded.foca_state;
",
)?
foca_state = excluded.foca_state;")?
.execute(params![
id,
address.to_string(),
state,
foca_state
])?;

if upserted != 1 {
warn!("did not update member");
}
}

tx.commit()?;
Expand Down Expand Up @@ -691,7 +702,7 @@ fn transmit_broadcast(payload: Bytes, transport: Transport, addr: SocketAddr) {
}

if let Err(e) = stream.finish().await {
warn!("error finishing broadcast uni stream to {addr}: {e}");
debug!("could not finish broadcast uni stream to {addr}: {e}");
}
});
}
9 changes: 8 additions & 1 deletion crates/corro-types/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, net::SocketAddr, ops::Deref};
use std::{fmt, hash::Hash, net::SocketAddr, ops::Deref};

use foca::Identity;
use rusqlite::{
Expand Down Expand Up @@ -105,6 +105,13 @@ pub struct Actor {
bump: u16,
}

impl Hash for Actor {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
self.addr.hash(state);
}
}

impl Actor {
pub fn new(id: ActorId, addr: SocketAddr) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/corrosion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ struct Cli {
long = "config",
short,
global = true,
default_value = "corrosion.toml"
default_value = "/etc/corrosion/config.toml"
)]
config_path: Utf8PathBuf,

Expand Down