From 87f6fc613778e397cff710a5a6383e88546ccc60 Mon Sep 17 00:00:00 2001 From: Pavel Borzenkov Date: Tue, 23 Jul 2024 14:40:24 +0200 Subject: [PATCH 1/2] corro-client: fail if change is too new after subscribe If the client subscribes with known change ID fail the request in case Corrosion returns a change with a gap, meaning the original change has already been recycled. Instead of silently ignoring the error force the client to subscribe from scratch. --- crates/corro-client/src/lib.rs | 2 ++ crates/corro-client/src/sub.rs | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/corro-client/src/lib.rs b/crates/corro-client/src/lib.rs index f1c4ed4b..3f8fce13 100644 --- a/crates/corro-client/src/lib.rs +++ b/crates/corro-client/src/lib.rs @@ -139,6 +139,7 @@ impl CorrosionApiClient { self.api_client.clone(), self.api_addr, res.into_body(), + from, )) } @@ -189,6 +190,7 @@ impl CorrosionApiClient { self.api_client.clone(), self.api_addr, res.into_body(), + from, )) } diff --git a/crates/corro-client/src/sub.rs b/crates/corro-client/src/sub.rs index 71128bd1..74a779b5 100644 --- a/crates/corro-client/src/sub.rs +++ b/crates/corro-client/src/sub.rs @@ -94,13 +94,14 @@ where client: hyper::Client, api_addr: SocketAddr, body: hyper::Body, + change_id: Option, ) -> Self { Self { id, client, api_addr, - observed_eoq: false, - last_change_id: None, + observed_eoq: change_id.is_some(), + last_change_id: change_id, stream: Some(FramedRead::new( StreamReader::new(IoBodyStream { body }), LinesBytesCodec::default(), From 927163e6a609c528f768e5f97e6ba1459f8d63dd Mon Sep 17 00:00:00 2001 From: Pavel Borzenkov Date: Tue, 23 Jul 2024 14:41:47 +0200 Subject: [PATCH 2/2] corro-client: add 3 seconds timeout to DNS resolves Otherwise they may take a while without printing anything to the logs, which makes it really hard to debug problems. --- crates/corro-client/src/lib.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/corro-client/src/lib.rs b/crates/corro-client/src/lib.rs index 3f8fce13..e23bd017 100644 --- a/crates/corro-client/src/lib.rs +++ b/crates/corro-client/src/lib.rs @@ -12,10 +12,13 @@ use std::{ time::{self, Duration, Instant}, }; use sub::{QueryStream, SubscriptionStream}; -use tokio::sync::{RwLock, RwLockReadGuard}; +use tokio::{ + sync::{RwLock, RwLockReadGuard}, + time::timeout, +}; use tracing::{debug, info, warn}; use trust_dns_resolver::{ - error::ResolveError, + error::{ResolveError, ResolveErrorKind}, name_server::{GenericConnection, GenericConnectionProvider, TokioRuntime}, AsyncResolver, }; @@ -23,6 +26,7 @@ use uuid::Uuid; const HTTP2_CONNECT_TIMEOUT: Duration = Duration::from_secs(3); const HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10); +const DNS_RESOLVE_TIMEOUT: Duration = Duration::from_secs(3); #[derive(Clone)] pub struct CorrosionApiClient { @@ -589,9 +593,9 @@ impl AddrPicker { .and_then(|(host, port)| Some((host, port.parse().ok()?))) .ok_or(ResolveError::from("Invalid Corrosion server address"))?; - self.resolver - .lookup_ip(host) - .await? + timeout(DNS_RESOLVE_TIMEOUT, self.resolver.lookup_ip(host)) + .await + .map_err(|_| ResolveError::from(ResolveErrorKind::Timeout))?? .iter() .map(|addr| (addr, port).into()) .collect::>()