Skip to content

Commit

Permalink
Merge pull request #240 from superfly/fix-change-skip
Browse files Browse the repository at this point in the history
Return error of provided change ID is too old
  • Loading branch information
pborzenkov authored Jul 23, 2024
2 parents b40893b + 927163e commit d78072d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
16 changes: 11 additions & 5 deletions crates/corro-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@ use std::{
time::{self, Duration, Instant},
};
use sub::{QueryStream, SubscriptionStream};
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio::{
sync::{RwLock, RwLockReadGuard},
time::timeout,
};
use tracing::{debug, info, warn};
use trust_dns_resolver::{
error::ResolveError,
error::{ResolveError, ResolveErrorKind},
name_server::{GenericConnection, GenericConnectionProvider, TokioRuntime},
AsyncResolver,
};
use uuid::Uuid;

const HTTP2_CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
const HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
const DNS_RESOLVE_TIMEOUT: Duration = Duration::from_secs(3);

#[derive(Clone)]
pub struct CorrosionApiClient {
Expand Down Expand Up @@ -139,6 +143,7 @@ impl CorrosionApiClient {
self.api_client.clone(),
self.api_addr,
res.into_body(),
from,
))
}

Expand Down Expand Up @@ -189,6 +194,7 @@ impl CorrosionApiClient {
self.api_client.clone(),
self.api_addr,
res.into_body(),
from,
))
}

Expand Down Expand Up @@ -587,9 +593,9 @@ impl AddrPicker {
.and_then(|(host, port)| Some((host, port.parse().ok()?)))
.ok_or(ResolveError::from("Invalid Corrosion server address"))?;

self.resolver
.lookup_ip(host)
.await?
timeout(DNS_RESOLVE_TIMEOUT, self.resolver.lookup_ip(host))
.await
.map_err(|_| ResolveError::from(ResolveErrorKind::Timeout))??
.iter()
.map(|addr| (addr, port).into())
.collect::<Vec<_>>()
Expand Down
5 changes: 3 additions & 2 deletions crates/corro-client/src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ where
client: hyper::Client<HttpConnector, Body>,
api_addr: SocketAddr,
body: hyper::Body,
change_id: Option<ChangeId>,
) -> Self {
Self {
id,
client,
api_addr,
observed_eoq: false,
last_change_id: None,
observed_eoq: change_id.is_some(),
last_change_id: change_id,
stream: Some(FramedRead::new(
StreamReader::new(IoBodyStream { body }),
LinesBytesCodec::default(),
Expand Down

0 comments on commit d78072d

Please sign in to comment.