Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): send barriers in batch when possible #19932

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

zwang28
Copy link
Contributor

@zwang28 zwang28 commented Dec 25, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Excessive barrier messages between dispatcher and merger have been shown to hinder performance, by consuming CPU resources on message polling. This issue is significant when parallelism is high, because a barrier would result in NxM barrier messages between dispatcher and merger. Please refer to the benchmark attached below for further details.

This PR aims to reduce the number of barrier messages between dispatcher and merger, by combining consecutive non-mutation barriers into one message.

More discussion here.

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • My PR contains critical fixes that are necessary to be merged into the latest release.

Documentation

  • My PR needs documentation updates.
Release note

@zwang28 zwang28 force-pushed the wangzheng/batch_msg branch 4 times, most recently from 997af79 to 08e83f2 Compare December 25, 2024 03:37
@zwang28 zwang28 force-pushed the wangzheng/batch_msg branch from 08e83f2 to 366a139 Compare December 25, 2024 04:49
@zwang28
Copy link
Contributor Author

zwang28 commented Dec 25, 2024

The cluster is running a release build, 256 parallelism in total, deployed via risedev in a 16c32GB EC2: d7b5cf5

The comparison below shows that

  1. Barrier batching alleviates the barrier issue.
  2. Even with barrier batching, the source throughput is suboptimal—10,000 rows per second in the figure compared to the 100,000 rows per second defined by the source. This discrepancy arises because the system is not fully utilizing the CPUs for processing actual stream jobs; instead, it's dedicating resources to handling barriers. This is expected, as the barrier batching approach is reactive, intervening only when barrier overhead begins to drain CPU resources and create issues.
Screenshot 2024-12-25 at 15 22 52

An additional test not included in the figure indicates that the main branch experiences the same issue as when stream_max_barrier_batch_size=0. But the performance of the stream_max_barrier_batch_size=0 one is slightly worse. This appears to be caused by the implementation overhead introduced in this PR. I'll look into it.

@zwang28 zwang28 marked this pull request as ready for review December 25, 2024 07:29
let limit = (self.context.config.developer).exchange_concurrent_dispatchers;

// Only barrier can be batched for now.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strongly related to this PR: I wonder whether peaking the message including barrier/chunk and batch them by size into one gRPC message can be a general optimization to amortize network cost.

Theoretically it is http2 and tcp under the hook so the network package should be batched already but given that the barrier batch optimization works, I wonder whether there are other overheads that we are not aware of in gRPC.

I also saw a few asks for gRPC streaming rpc message batching elsewhere:

Copy link
Contributor Author

@zwang28 zwang28 Dec 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether there are other overheads that we are not aware of in gRPC.

This PR also demonstrates the improvement when tested in a single compute node, where all channels are local.

but given that the barrier batch optimization works

This PR primarily reduces the total number of incoming message for both RemoteInput and LocalInput. However from the perspective of MergeExecutor, the total number of message to poll doesn't change since BarrierBatch has been reverted to multiple Barriers for the output RemoteInput and LocalInput.

Regarding the source of performance gain in this PR, I believe that although the total number of messages remains unchanged for the MergeExecutor, the cost of polling a message can be reduced sometimes due to this very simple new state in the auto-generated future.

@@ -1150,6 +1150,9 @@ pub struct StreamingDeveloperConfig {
/// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..."
/// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks.
pub switch_jdbc_pg_to_native: bool,

#[serde(default = "default::developer::stream_max_barrier_batch_size")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add some docs?


impl DispatcherMessageBatch {
fn into_messages(self) -> impl Iterator<Item = DispatcherMessage> {
#[auto_enums::auto_enum(Iterator)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simply adopt Either instead of creating a new TAIT which may confuses IDEs.

Comment on lines +1049 to +1061
#[derive(Debug, EnumAsInner, PartialEq, Clone)]
pub enum MessageBatch {
Chunk(StreamChunk),
BarrierBatch(Vec<BarrierInner<BarrierMutationType>>),
Watermark(Watermark),
}
pub type DispatcherBarriers = Vec<DispatcherBarrier>;
#[derive(Debug, EnumAsInner, PartialEq, Clone)]
pub enum DispatcherMessageBatch {
Chunk(StreamChunk),
BarrierBatch(Vec<BarrierInner<()>>),
Watermark(Watermark),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unify these types with type parameter?

Also, would you add some docs?

#[derive(Debug, EnumAsInner, PartialEq, Clone)]
pub enum MessageBatch {
Chunk(StreamChunk),
BarrierBatch(Vec<BarrierInner<BarrierMutationType>>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have some empirical idea of the length of barrier batch? If it's small, shall we consider SmallVec here?

Comment on lines +410 to +417
let mut input = self.input.execute().peekable();
let mut end_of_stream = false;
while !end_of_stream {
let Some(msg) = input.next().await else {
end_of_stream = true;
continue;
};
let mut barrier_batch = vec![];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to decouple the logic for batching messages from dispatching them?

.config
.developer
.max_barrier_batch_size
.saturating_sub(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we sub here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants