-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(stream): send barriers in batch when possible #19932
base: main
Are you sure you want to change the base?
Conversation
997af79
to
08e83f2
Compare
08e83f2
to
366a139
Compare
The cluster is running a release build, 256 parallelism in total, deployed via risedev in a 16c32GB EC2: d7b5cf5 The comparison below shows that
An additional test not included in the figure indicates that the main branch experiences the same issue as when |
let limit = (self.context.config.developer).exchange_concurrent_dispatchers; | ||
|
||
// Only barrier can be batched for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not strongly related to this PR: I wonder whether peaking the message including barrier/chunk and batch them by size into one gRPC message can be a general optimization to amortize network cost.
Theoretically it is http2 and tcp under the hook so the network package should be batched already but given that the barrier batch optimization works, I wonder whether there are other overheads that we are not aware of in gRPC.
I also saw a few asks for gRPC streaming rpc message batching elsewhere:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether there are other overheads that we are not aware of in gRPC.
This PR also demonstrates the improvement when tested in a single compute node, where all channels are local.
but given that the barrier batch optimization works
This PR primarily reduces the total number of incoming message for both RemoteInput
and LocalInput
. However from the perspective of MergeExecutor
, the total number of message to poll doesn't change since BarrierBatch
has been reverted to multiple Barriers
for the output RemoteInput
and LocalInput
.
Regarding the source of performance gain in this PR, I believe that although the total number of messages remains unchanged for the MergeExecutor
, the cost of polling a message can be reduced sometimes due to this very simple new state in the auto-generated future.
@@ -1150,6 +1150,9 @@ pub struct StreamingDeveloperConfig { | |||
/// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..." | |||
/// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks. | |||
pub switch_jdbc_pg_to_native: bool, | |||
|
|||
#[serde(default = "default::developer::stream_max_barrier_batch_size")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add some docs?
|
||
impl DispatcherMessageBatch { | ||
fn into_messages(self) -> impl Iterator<Item = DispatcherMessage> { | ||
#[auto_enums::auto_enum(Iterator)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can simply adopt Either
instead of creating a new TAIT which may confuses IDEs.
#[derive(Debug, EnumAsInner, PartialEq, Clone)] | ||
pub enum MessageBatch { | ||
Chunk(StreamChunk), | ||
BarrierBatch(Vec<BarrierInner<BarrierMutationType>>), | ||
Watermark(Watermark), | ||
} | ||
pub type DispatcherBarriers = Vec<DispatcherBarrier>; | ||
#[derive(Debug, EnumAsInner, PartialEq, Clone)] | ||
pub enum DispatcherMessageBatch { | ||
Chunk(StreamChunk), | ||
BarrierBatch(Vec<BarrierInner<()>>), | ||
Watermark(Watermark), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we unify these types with type parameter?
Also, would you add some docs?
#[derive(Debug, EnumAsInner, PartialEq, Clone)] | ||
pub enum MessageBatch { | ||
Chunk(StreamChunk), | ||
BarrierBatch(Vec<BarrierInner<BarrierMutationType>>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have some empirical idea of the length of barrier batch? If it's small, shall we consider SmallVec
here?
let mut input = self.input.execute().peekable(); | ||
let mut end_of_stream = false; | ||
while !end_of_stream { | ||
let Some(msg) = input.next().await else { | ||
end_of_stream = true; | ||
continue; | ||
}; | ||
let mut barrier_batch = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to decouple the logic for batching messages from dispatching them?
.config | ||
.developer | ||
.max_barrier_batch_size | ||
.saturating_sub(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we sub
here?
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Excessive barrier messages between dispatcher and merger have been shown to hinder performance, by consuming CPU resources on message polling. This issue is significant when parallelism is high, because a barrier would result in NxM barrier messages between dispatcher and merger. Please refer to the benchmark attached below for further details.
This PR aims to reduce the number of barrier messages between dispatcher and merger, by combining consecutive non-mutation barriers into one message.
More discussion here.
Checklist
Documentation
Release note