Skip to content

Commit

Permalink
quic: disconnect a peer if they send a stream > PACKET_DATA_SIZE (sol…
Browse files Browse the repository at this point in the history
…ana-labs#3297)

Peers should validate txs and never send txs larger than max size.
  • Loading branch information
alessandrod authored Oct 28, 2024
1 parent 2be7c2e commit 890bed2
Showing 1 changed file with 69 additions and 10 deletions.
79 changes: 69 additions & 10 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";

const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5;
const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream";

/// Limit to 250K PPS
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;

Expand Down Expand Up @@ -1034,7 +1037,7 @@ async fn handle_connection(
);
let stable_id = connection.stable_id();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
loop {
'conn: loop {
// Wait for new streams. If the peer is disconnected we get a cancellation signal and stop
// the connection task.
let mut stream = select! {
Expand Down Expand Up @@ -1119,7 +1122,7 @@ async fn handle_connection(
}
};

if handle_chunk(
match handle_chunk(
chunk,
&mut maybe_batch,
&remote_addr,
Expand All @@ -1129,8 +1132,21 @@ async fn handle_connection(
)
.await
{
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
// The stream is finished, break out of the loop and close the stream.
Ok(StreamState::Finished) => {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
// The stream is still active, continue reading.
Ok(StreamState::Receiving) => {}
Err(_) => {
// Disconnect peers that send invalid streams.
connection.close(
CONNECTION_CLOSE_CODE_INVALID_STREAM.into(),
CONNECTION_CLOSE_REASON_INVALID_STREAM,
);
break 'conn;
}
}
}

Expand All @@ -1155,15 +1171,25 @@ async fn handle_connection(
stats.total_connections.fetch_sub(1, Ordering::Relaxed);
}

// Return true if the server should drop the stream
enum StreamState {
// Stream is not finished, keep receiving chunks
Receiving,
// Stream is finished
Finished,
}

// Handle the chunks received from the stream. If the stream is finished, send the packet to the
// packet sender.
//
// Returns Err(()) if the stream is invalid.
async fn handle_chunk(
maybe_chunk: Option<quinn::Chunk>,
packet_accum: &mut Option<PacketAccumulator>,
remote_addr: &SocketAddr,
packet_sender: &AsyncSender<PacketAccumulator>,
stats: Arc<StreamerStats>,
peer_type: ConnectionPeerType,
) -> bool {
) -> Result<StreamState, ()> {
if let Some(chunk) = maybe_chunk {
trace!("got chunk: {:?}", chunk);

Expand All @@ -1184,10 +1210,10 @@ async fn handle_chunk(
if accum.meta.size > PACKET_DATA_SIZE {
// The stream window size is set to PACKET_DATA_SIZE, so one individual chunk can
// never exceed this size. A peer can send two chunks that together exceed the size
// tho, in which case we drop the stream.
// tho, in which case we report the error.
stats.invalid_stream_size.fetch_add(1, Ordering::Relaxed);
debug!("invalid stream size {}", accum.meta.size);
return true;
return Err(());
}
accum.chunks.push(chunk.bytes);
}
Expand Down Expand Up @@ -1239,15 +1265,18 @@ async fn handle_chunk(

trace!("sent {} byte packet for batching", bytes_sent);
}
return Ok(StreamState::Finished);
} else {
debug!("stream is empty");
stats
.total_packet_batches_none
.fetch_add(1, Ordering::Relaxed);

return Err(());
}
return true;
}

false
Ok(StreamState::Receiving)
}

#[derive(Debug)]
Expand Down Expand Up @@ -1498,6 +1527,7 @@ pub mod test {
assert_matches::assert_matches,
async_channel::unbounded as async_unbounded,
crossbeam_channel::{unbounded, Receiver},
quinn::{ApplicationClose, ConnectionError},
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair, signer::Signer},
std::collections::HashMap,
tokio::time::sleep,
Expand Down Expand Up @@ -2413,4 +2443,33 @@ pub mod test {
drop(tracker_1);
assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_client_connection_close_invalid_stream() {
let SpawnTestServerResult {
join_handle,
server_address,
stats,
exit,
..
} = setup_quic_server(None, TestServerConfig::default());

let client_connection = make_client_endpoint(&server_address, None).await;

let mut send_stream = client_connection.open_uni().await.unwrap();
send_stream
.write_all(&[42; PACKET_DATA_SIZE + 1])
.await
.unwrap();
match client_connection.closed().await {
ConnectionError::ApplicationClosed(ApplicationClose { error_code, reason }) => {
assert_eq!(error_code, CONNECTION_CLOSE_CODE_INVALID_STREAM.into());
assert_eq!(reason, CONNECTION_CLOSE_REASON_INVALID_STREAM);
}
_ => panic!("unexpected close"),
}
assert_eq!(stats.invalid_stream_size.load(Ordering::Relaxed), 1);
exit.store(true, Ordering::Relaxed);
join_handle.await.unwrap();
}
}

0 comments on commit 890bed2

Please sign in to comment.