Skip to content

Commit

Permalink
I think this fixes the uhlc kerfluffle
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Aug 24, 2023
1 parent 6003b5f commit 30e0a74
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 83 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ tracing = "0.1.37"
tracing-filter = { version = "0.1.0-alpha.2", features = ["smallvec"] }
tracing-subscriber = { version = "0.3.16", features = ["json", "env-filter"] }
trust-dns-resolver = "0.22.0"
uhlc = { version = "0.6.0", features = ["defmt"] }
uhlc = { version = "0.5.2", features = ["defmt"] }
uuid = { version = "1.3.1", features = ["v4", "serde"] }
webpki = { version = "0.22.0", features = ["std"] }

Expand Down
21 changes: 8 additions & 13 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,14 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
let row = rows.next()?;
match row {
None => break,
Some(row) => match partials.entry((row.get(0)?, row.get(1)?)) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
let (range, last_seq, ts) = entry.get_mut();
range.insert(row.get(2)?..=row.get(3)?);
*last_seq = row.get(4)?;
*ts = row.get(5)?;
}
std::collections::hash_map::Entry::Vacant(entry) => {
let mut range: RangeInclusiveSet<i64> = Default::default();
range.insert(row.get(2)?..=row.get(3)?);
entry.insert((range, row.get(4)?, row.get(5)?));
}
},
Some(row) => {
let (range, last_seq, ts) =
partials.entry((row.get(0)?, row.get(1)?)).or_default();

range.insert(row.get(2)?..=row.get(3)?);
*last_seq = row.get(4)?;
*ts = row.get(5)?;
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,9 @@ pub async fn bidirectional_sync(
continue;
}
SyncMessage::V1(SyncMessageV1::Clock(ts)) => {
if let Err(e) = agent.clock().update_with_timestamp(&ts) {
if let Err(e) = agent.clock().update_with_timestamp(
&uhlc::Timestamp::new(ts.to_ntp64(), their_actor_id.into()),
) {
warn!(
"could not update clock from actor {their_actor_id}: {e}"
);
Expand Down
5 changes: 1 addition & 4 deletions crates/corro-types/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ impl ActorId {

impl Into<uhlc::ID> for ActorId {
fn into(self) -> uhlc::ID {
self.0
.as_u128()
.try_into()
.expect("attempted to convert a nil (0) actor id a uhlc ID and failed!")
self.0.into()
}
}

Expand Down
76 changes: 15 additions & 61 deletions crates/corro-types/src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
fmt, io, mem,
fmt, io,
num::NonZeroU32,
ops::{Deref, RangeInclusive},
time::Duration,
Expand All @@ -18,7 +18,7 @@ use speedy::{Readable, Writable};
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use tracing::{error, trace};
use uhlc::{ParseTimestampError, NTP64};
use uhlc::{ParseNTP64Error, NTP64};

use crate::{
actor::{Actor, ActorId},
Expand Down Expand Up @@ -164,91 +164,45 @@ impl Changeset {
#[derive(Debug, thiserror::Error)]
pub enum TimestampParseError {
#[error("could not parse timestamp: {0:?}")]
Parse(ParseTimestampError),
Parse(ParseNTP64Error),
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[derive(
Debug, Default, Clone, Copy, Serialize, Deserialize, Readable, Writable, PartialEq, Eq,
)]
#[serde(transparent)]
pub struct Timestamp(uhlc::Timestamp);
pub struct Timestamp(pub u64);

impl Timestamp {
pub fn to_time(&self) -> OffsetDateTime {
let t = self.0.get_time();
let t = NTP64(self.0);
OffsetDateTime::from_unix_timestamp(t.as_secs() as i64).unwrap()
+ time::Duration::nanoseconds(t.subsec_nanos() as i64)
}
}

impl Deref for Timestamp {
type Target = uhlc::Timestamp;

fn deref(&self) -> &Self::Target {
&self.0
pub fn to_ntp64(&self) -> NTP64 {
NTP64(self.0)
}
}

impl fmt::Display for Timestamp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
NTP64(self.0).fmt(f)
}
}

impl From<uhlc::Timestamp> for Timestamp {
fn from(ts: uhlc::Timestamp) -> Self {
Self(ts)
}
}

const TS_SIZE: usize = mem::size_of::<u64>() + 16;

impl<'a, C> Readable<'a, C> for Timestamp
where
C: speedy::Context,
{
#[inline]
fn read_from<R: speedy::Reader<'a, C>>(
reader: &mut R,
) -> Result<Self, <C as speedy::Context>::Error> {
let time: u64 = Readable::read_from(reader)?;
let id: [u8; 16] = Readable::read_from(reader)?;
Ok(uhlc::Timestamp::new(
NTP64(time),
id.try_into().map_err(|e| speedy::Error::custom(e))?,
)
.into())
}

#[inline]
fn minimum_bytes_needed() -> usize {
TS_SIZE
}
}

impl<C> Writable<C> for Timestamp
where
C: speedy::Context,
{
#[inline]
fn write_to<T: ?Sized + speedy::Writer<C>>(
&self,
writer: &mut T,
) -> Result<(), <C as speedy::Context>::Error> {
self.get_time().0.write_to(writer)?;
self.get_id().to_le_bytes().write_to(writer)
}

#[inline]
fn bytes_needed(&self) -> Result<usize, C::Error> {
Ok(TS_SIZE)
Self(ts.get_time().as_u64())
}
}

impl FromSql for Timestamp {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
match value {
rusqlite::types::ValueRef::Text(b) => match std::str::from_utf8(b) {
Ok(s) => match s.parse::<uhlc::Timestamp>() {
Ok(ts) => Ok(Timestamp(ts)),
Ok(s) => match s.parse::<NTP64>() {
Ok(ntp) => Ok(Timestamp(ntp.as_u64())),
Err(e) => Err(FromSqlError::Other(Box::new(TimestampParseError::Parse(e)))),
},
Err(e) => Err(FromSqlError::Other(Box::new(e))),
Expand All @@ -261,7 +215,7 @@ impl FromSql for Timestamp {
impl ToSql for Timestamp {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
Ok(rusqlite::types::ToSqlOutput::Owned(
rusqlite::types::Value::Text(self.0.to_string()),
rusqlite::types::Value::Text(NTP64(self.0).to_string()),
))
}
}
Expand Down

0 comments on commit 30e0a74

Please sign in to comment.