Skip to content

Commit

Permalink
order nodes chosen for sync by last sync time also
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Jul 4, 2024
1 parent a82563d commit d3722ab
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
22 changes: 19 additions & 3 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,15 @@ pub async fn handle_sync(
**id != agent.actor_id() && state.cluster_id == agent.cluster_id()
})
// Grab a ring-buffer index to the member RTT range
.map(|(id, state)| (*id, state.ring.unwrap_or(255), state.addr))
.collect::<Vec<(ActorId, u8, SocketAddr)>>()
.map(|(id, state)| {
(
*id,
state.ring.unwrap_or(255),
state.addr,
state.last_sync_ts,
)
})
.collect::<Vec<(ActorId, u8, SocketAddr, Option<Timestamp>)>>()
};

if candidates.is_empty() {
Expand All @@ -816,14 +823,16 @@ pub async fn handle_sync(
sync_state
.need_len_for_actor(&b.0)
.cmp(&sync_state.need_len_for_actor(&a.0))
// if equal, look at last sync time
.then_with(|| a.3.cmp(&b.3))
// if equal, look at proximity (via `ring`)
.then_with(|| a.1.cmp(&b.1))
});

choices.truncate(desired_count);
choices
.into_iter()
.map(|(actor_id, _, addr)| (actor_id, addr))
.map(|(actor_id, _, addr, _)| (actor_id, addr))
.collect()
};

Expand Down Expand Up @@ -852,13 +861,20 @@ pub async fn handle_sync(
info!(
"synced {n} changes w/ {} in {}s @ {} changes/s",
chosen
.clone()
.into_iter()
.map(|(actor_id, _)| actor_id.to_string())
.collect::<Vec<_>>()
.join(", "),
elapsed.as_secs_f64(),
n as f64 / elapsed.as_secs_f64()
);

let ts = Timestamp::from(agent.clock().new_timestamp());
for (actor_id, _) in chosen {
let mut members = agent.members().write();
members.update_sync_ts(actor_id, ts);
}
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-types/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ pub enum TimestampParseError {
Parse(ParseNTP64Error),
}

#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd, Hash)]
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd, Ord, Hash)]
#[serde(transparent)]
pub struct Timestamp(pub NTP64);

Expand Down
8 changes: 8 additions & 0 deletions crates/corro-types/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct MemberState {
pub cluster_id: ClusterId,

pub ring: Option<u8>,
pub last_sync_ts: Option<Timestamp>,
}

impl MemberState {
Expand All @@ -25,6 +26,7 @@ impl MemberState {
ts,
cluster_id,
ring: None,
last_sync_ts: None,
}
}

Expand Down Expand Up @@ -59,6 +61,12 @@ impl Members {
self.states.get(id)
}

pub fn update_sync_ts(&mut self, actor_id: ActorId, ts: Timestamp) {
if let Some(state) = self.states.get_mut(&actor_id) {
state.last_sync_ts = Some(ts);
}
}

// A result of `true` means that the effective list of
// cluster member addresses has changed
pub fn add_member(&mut self, actor: &Actor) -> MemberAddedResult {
Expand Down

0 comments on commit d3722ab

Please sign in to comment.