From f7ee62f28761f769eaaef16f735d78416be8d399 Mon Sep 17 00:00:00 2001 From: Pedro Eugenio Rocha Pedreira Date: Wed, 11 Dec 2024 16:54:44 -0800 Subject: [PATCH] feat(planbuilder): Accept schema for tableWriter() (#11829) Summary: When creating a TableWriter node, allowing the client to specify a schema different from the default (the output of the previous operator). Reviewed By: Yuhta Differential Revision: D67102795 --- velox/exec/tests/utils/PlanBuilder.cpp | 5 +++-- velox/exec/tests/utils/PlanBuilder.h | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 08a3698913a2..a9cf1bccb8fd 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -443,9 +443,10 @@ PlanBuilder& PlanBuilder::tableWrite( const std::unordered_map& serdeParameters, const std::shared_ptr& options, const std::string& outputFileName, - const common::CompressionKind compressionKind) { + const common::CompressionKind compressionKind, + const RowTypePtr& schema) { VELOX_CHECK_NOT_NULL(planNode_, "TableWrite cannot be the source node"); - auto rowType = planNode_->outputType(); + auto rowType = schema ? schema : planNode_->outputType(); std::vector> columnHandles; diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 264df5d2ea63..4bd09fc680f9 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -485,6 +485,8 @@ class PlanBuilder { /// only be specified in non-bucketing write. /// @param compressionKind Compression scheme to use for writing the /// output data files. + /// @param schema Output schema to be passed to the writer. By default use the + /// output of the previous operator. PlanBuilder& tableWrite( const std::string& outputDirectoryPath, const std::vector& partitionBy, @@ -499,7 +501,8 @@ class PlanBuilder { const std::unordered_map& serdeParameters = {}, const std::shared_ptr& options = nullptr, const std::string& outputFileName = "", - const common::CompressionKind = common::CompressionKind_NONE); + const common::CompressionKind = common::CompressionKind_NONE, + const RowTypePtr& schema = nullptr); /// Add a TableWriteMergeNode. PlanBuilder& tableWriteMerge(