Skip to content

Commit

Permalink
Backport connection stream counter to v1.17 (#991)
Browse files Browse the repository at this point in the history
* Backport ConnectionStreamCounter

* Addressed a feedback from Pankaj
  • Loading branch information
lijunwangs authored May 7, 2024
1 parent 19dea1d commit 58916d9
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 41 deletions.
1 change: 1 addition & 0 deletions streamer/src/nonblocking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod quic;
pub mod recvmmsg;
pub mod sendmmsg;
mod stream_throttle;
102 changes: 61 additions & 41 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
super::stream_throttle::ConnectionStreamCounter,
crate::{
quic::{configure_server, QuicServerError, StreamStats},
streamer::StakedNodes,
Expand Down Expand Up @@ -49,7 +50,7 @@ use {
// introduce any other awaits while holding the RwLock.
sync::{Mutex, MutexGuard},
task::JoinHandle,
time::timeout,
time::{sleep, timeout},
},
};

Expand All @@ -72,7 +73,6 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre

const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
const STREAM_STOP_CODE_THROTTLING: u32 = 15;

/// Limit to 500K PPS
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 500;
Expand Down Expand Up @@ -360,14 +360,16 @@ fn handle_and_cache_new_connection(
remote_addr,
);

if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
Some(connection.clone()),
params.stake,
timing::timestamp(),
params.max_connections_per_peer,
) {
if let Some((last_update, stream_exit, stream_counter)) = connection_table_l
.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
Some(connection.clone()),
params.stake,
timing::timestamp(),
params.max_connections_per_peer,
)
{
let peer_type = connection_table_l.peer_type;
drop(connection_table_l);

Expand All @@ -387,6 +389,7 @@ fn handle_and_cache_new_connection(
wait_for_chunk_timeout,
max_unstaked_connections,
max_streams_per_ms,
stream_counter,
));
Ok(())
} else {
Expand Down Expand Up @@ -770,15 +773,6 @@ fn max_streams_for_connection_in_100ms(
}
}

fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> bool {
if tokio::time::Instant::now().duration_since(*last_instant) > STREAM_THROTTLING_INTERVAL {
*last_instant = tokio::time::Instant::now();
true
} else {
false
}
}

#[allow(clippy::too_many_arguments)]
async fn handle_connection(
connection: Connection,
Expand All @@ -791,6 +785,7 @@ async fn handle_connection(
wait_for_chunk_timeout: Duration,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stream_counter: Arc<ConnectionStreamCounter>,
) {
let stats = params.stats;
debug!(
Expand All @@ -801,41 +796,54 @@ async fn handle_connection(
);
let stable_id = connection.stable_id();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let max_streams_per_100ms = max_streams_for_connection_in_100ms(
let max_streams_per_throttling_interval = max_streams_for_connection_in_100ms(
peer_type,
params.stake,
params.total_stake,
max_unstaked_connections,
max_streams_per_ms,
);
let mut last_throttling_instant = tokio::time::Instant::now();
let mut streams_in_current_interval = 0;

while !stream_exit.load(Ordering::Relaxed) {
if let Ok(stream) =
tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await
{
match stream {
Ok(mut stream) => {
if reset_throttling_params_if_needed(&mut last_throttling_instant) {
streams_in_current_interval = 0;
} else if streams_in_current_interval >= max_streams_per_100ms {
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
match peer_type {
ConnectionPeerType::Unstaked => {
stats
.throttled_unstaked_streams
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Staked => {
stats
.throttled_staked_streams
.fetch_add(1, Ordering::Relaxed);
let throttle_interval_start: tokio::time::Instant =
stream_counter.reset_throttling_params_if_needed();
let streams_read_in_throttle_interval =
stream_counter.stream_count.load(Ordering::Relaxed);

if streams_read_in_throttle_interval >= max_streams_per_throttling_interval {
// The peer is sending faster than we're willing to read. Sleep for what's
// left of this read interval so the peer backs off.
let throttle_duration = STREAM_THROTTLING_INTERVAL
.saturating_sub(throttle_interval_start.elapsed());
if !throttle_duration.is_zero() {
debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \
max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \
throttle_duration: {throttle_duration:?}",
peer_type, params.total_stake);
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);

match peer_type {
ConnectionPeerType::Unstaked => {
stats
.throttled_unstaked_streams
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Staked => {
stats
.throttled_staked_streams
.fetch_add(1, Ordering::Relaxed);
}
}
sleep(throttle_duration).await;
}
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
continue;
}
streams_in_current_interval = streams_in_current_interval.saturating_add(1);
stream_counter.stream_count.fetch_add(1, Ordering::Relaxed);

stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let stream_exit = stream_exit.clone();
Expand Down Expand Up @@ -1041,6 +1049,7 @@ struct ConnectionEntry {
last_update: Arc<AtomicU64>,
port: u16,
connection: Option<Connection>,
stream_counter: Arc<ConnectionStreamCounter>,
}

impl ConnectionEntry {
Expand All @@ -1050,13 +1059,15 @@ impl ConnectionEntry {
last_update: Arc<AtomicU64>,
port: u16,
connection: Option<Connection>,
stream_counter: Arc<ConnectionStreamCounter>,
) -> Self {
Self {
exit,
stake,
last_update,
port,
connection,
stream_counter,
}
}

Expand Down Expand Up @@ -1167,7 +1178,11 @@ impl ConnectionTable {
stake: u64,
last_update: u64,
max_connections_per_peer: usize,
) -> Option<(Arc<AtomicU64>, Arc<AtomicBool>)> {
) -> Option<(
Arc<AtomicU64>,
Arc<AtomicBool>,
Arc<ConnectionStreamCounter>,
)> {
let connection_entry = self.table.entry(key).or_default();
let has_connection_capacity = connection_entry
.len()
Expand All @@ -1177,15 +1192,20 @@ impl ConnectionTable {
if has_connection_capacity {
let exit = Arc::new(AtomicBool::new(false));
let last_update = Arc::new(AtomicU64::new(last_update));
let stream_counter = connection_entry
.first()
.map(|entry| entry.stream_counter.clone())
.unwrap_or(Arc::new(ConnectionStreamCounter::new()));
connection_entry.push(ConnectionEntry::new(
exit.clone(),
stake,
last_update.clone(),
port,
connection,
stream_counter.clone(),
));
self.total_size += 1;
Some((last_update, exit))
Some((last_update, exit, stream_counter))
} else {
if let Some(connection) = connection {
connection.close(
Expand Down
47 changes: 47 additions & 0 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
RwLock,
},
time::Duration,
};

pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
pub const STREAM_THROTTLING_INTERVAL: Duration =
Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS);

#[derive(Debug)]
pub(crate) struct ConnectionStreamCounter {
pub(crate) stream_count: AtomicU64,
last_throttling_instant: RwLock<tokio::time::Instant>,
}

impl ConnectionStreamCounter {
pub(crate) fn new() -> Self {
Self {
stream_count: AtomicU64::default(),
last_throttling_instant: RwLock::new(tokio::time::Instant::now()),
}
}

/// Reset the counter and last throttling instant and
/// return last_throttling_instant regardless it is reset or not.
pub(crate) fn reset_throttling_params_if_needed(&self) -> tokio::time::Instant {
let last_throttling_instant = *self.last_throttling_instant.read().unwrap();
if tokio::time::Instant::now().duration_since(last_throttling_instant)
> STREAM_THROTTLING_INTERVAL
{
let mut last_throttling_instant = self.last_throttling_instant.write().unwrap();
// Recheck as some other thread might have done throttling since this thread tried to acquire the write lock.
if tokio::time::Instant::now().duration_since(*last_throttling_instant)
> STREAM_THROTTLING_INTERVAL
{
*last_throttling_instant = tokio::time::Instant::now();
self.stream_count.store(0, Ordering::Relaxed);
}
*last_throttling_instant
} else {
last_throttling_instant
}
}
}

0 comments on commit 58916d9

Please sign in to comment.