-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add partitioning scheme for unresolved shuffle and shuffle reader exec #1144
base: main
Are you sure you want to change the base?
Conversation
ballista/core/src/serde/mod.rs
Outdated
@@ -291,8 +294,11 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { | |||
)?)) | |||
} | |||
PhysicalPlanType::ShuffleReader(shuffle_reader) => { | |||
let default_codec = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to make default_codec property BallistaPhysicalExtensionCodec
, so it can be overridden if needed? BallistaLogicalExtensionCodec
does similar thing. (also there is few other places where DefaultPhysicalCodec
is created)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, thanks!
@Dandandan would you be able to have a look, please |
Changes I think look good. Can we add a test for it? |
@Dandandan should be ready for another look |
@@ -50,14 +50,15 @@ message ShuffleWriterExecNode { | |||
message UnresolvedShuffleExecNode { | |||
uint32 stage_id = 1; | |||
datafusion_common.Schema schema = 2; | |||
uint32 output_partition_count = 4; | |||
datafusion.Partitioning partitioning = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just one small note, is there any reason we can't use 3 instead of 5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought there was a field with field number 3 in the past that got deleted, so putting 3 for the new field would violate back compatibility. I don't think we need to preserve back compat here though, not sure if there are users that do rolling updates to their ballista scheduler and executors, but wanted to be safe.
https://protobuf.dev/programming-guides/proto3/#consequences
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would assume that its not used much
Which issue does this PR close?
Closes #16.
Rationale for this change
unresolved shuffle and shuffle reader exec does not have partitioning info. This becomes a problem when a stage has
InterleaveExec
, which requires all its children to have hash partitioning on the same expressions. Currently even though the partitioning satisfies InterleaveExec's requirements, because children showUnknownPartitioning
planning such plans into stages fail.What changes are included in this PR?
Carry partitioning scheme through shuffles
Are there any user-facing changes?