-
Notifications
You must be signed in to change notification settings - Fork 199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add partitioning scheme for unresolved shuffle and shuffle reader exec #1144
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction}; | ||
|
||
use arrow_flight::sql::ProstMessageExt; | ||
use datafusion::arrow::datatypes::SchemaRef; | ||
use datafusion::common::{DataFusionError, Result}; | ||
use datafusion::execution::FunctionRegistry; | ||
use datafusion::physical_plan::{ExecutionPlan, Partitioning}; | ||
|
@@ -29,6 +30,8 @@ use datafusion_proto::logical_plan::file_formats::{ | |
JsonLogicalExtensionCodec, ParquetLogicalExtensionCodec, | ||
}; | ||
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning; | ||
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning; | ||
use datafusion_proto::physical_plan::to_proto::serialize_partitioning; | ||
use datafusion_proto::protobuf::proto_error; | ||
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode}; | ||
use datafusion_proto::{ | ||
|
@@ -291,8 +294,11 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { | |
)?)) | ||
} | ||
PhysicalPlanType::ShuffleReader(shuffle_reader) => { | ||
let default_codec = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it make sense to make default_codec property There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, thanks! |
||
datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {}; | ||
let stage_id = shuffle_reader.stage_id as usize; | ||
let schema = Arc::new(convert_required!(shuffle_reader.schema)?); | ||
let schema: SchemaRef = | ||
Arc::new(convert_required!(shuffle_reader.schema)?); | ||
let partition_location: Vec<Vec<PartitionLocation>> = shuffle_reader | ||
.partition | ||
.iter() | ||
|
@@ -309,16 +315,39 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { | |
.collect::<Result<Vec<_>, _>>() | ||
}) | ||
.collect::<Result<Vec<_>, DataFusionError>>()?; | ||
let shuffle_reader = | ||
ShuffleReaderExec::try_new(stage_id, partition_location, schema)?; | ||
let partitioning = parse_protobuf_partitioning( | ||
shuffle_reader.partitioning.as_ref(), | ||
registry, | ||
schema.as_ref(), | ||
&default_codec, | ||
)?; | ||
let partitioning = partitioning | ||
.ok_or_else(|| proto_error("missing required partitioning field"))?; | ||
let shuffle_reader = ShuffleReaderExec::try_new( | ||
stage_id, | ||
partition_location, | ||
schema, | ||
partitioning, | ||
)?; | ||
Ok(Arc::new(shuffle_reader)) | ||
} | ||
PhysicalPlanType::UnresolvedShuffle(unresolved_shuffle) => { | ||
let schema = Arc::new(convert_required!(unresolved_shuffle.schema)?); | ||
let default_codec = | ||
datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {}; | ||
let schema: SchemaRef = | ||
Arc::new(convert_required!(unresolved_shuffle.schema)?); | ||
let partitioning = parse_protobuf_partitioning( | ||
unresolved_shuffle.partitioning.as_ref(), | ||
registry, | ||
schema.as_ref(), | ||
&default_codec, | ||
)?; | ||
let partitioning = partitioning | ||
.ok_or_else(|| proto_error("missing required partitioning field"))?; | ||
Ok(Arc::new(UnresolvedShuffleExec::new( | ||
unresolved_shuffle.stage_id as usize, | ||
schema, | ||
unresolved_shuffle.output_partition_count as usize, | ||
partitioning, | ||
))) | ||
} | ||
} | ||
|
@@ -387,12 +416,17 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { | |
.collect::<Result<Vec<_>, _>>()?, | ||
}); | ||
} | ||
let default_codec = | ||
datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {}; | ||
let partitioning = | ||
serialize_partitioning(&exec.properties().partitioning, &default_codec)?; | ||
let proto = protobuf::BallistaPhysicalPlanNode { | ||
physical_plan_type: Some(PhysicalPlanType::ShuffleReader( | ||
protobuf::ShuffleReaderExecNode { | ||
stage_id, | ||
partition, | ||
schema: Some(exec.schema().as_ref().try_into()?), | ||
partitioning: Some(partitioning), | ||
}, | ||
)), | ||
}; | ||
|
@@ -404,12 +438,16 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { | |
|
||
Ok(()) | ||
} else if let Some(exec) = node.as_any().downcast_ref::<UnresolvedShuffleExec>() { | ||
let default_codec = | ||
datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {}; | ||
let partitioning = | ||
serialize_partitioning(&exec.properties().partitioning, &default_codec)?; | ||
let proto = protobuf::BallistaPhysicalPlanNode { | ||
physical_plan_type: Some(PhysicalPlanType::UnresolvedShuffle( | ||
protobuf::UnresolvedShuffleExecNode { | ||
stage_id: exec.stage_id as u32, | ||
schema: Some(exec.schema().as_ref().try_into()?), | ||
output_partition_count: exec.output_partition_count as u32, | ||
partitioning: Some(partitioning), | ||
}, | ||
)), | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just one small note, is there any reason we can't use 3 instead of 5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought there was a field with field number 3 in the past that got deleted, so putting 3 for the new field would violate back compatibility. I don't think we need to preserve back compat here though, not sure if there are users that do rolling updates to their ballista scheduler and executors, but wanted to be safe.
https://protobuf.dev/programming-guides/proto3/#consequences
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would assume that its not used much