From 255d630a44766670abbc431e36a7b3d3d91b24e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A6=E4=BD=9F?= Date: Tue, 26 Nov 2024 10:46:53 +0800 Subject: [PATCH] feat: Specifiy target tracing driver IDs (#11560) Summary: Add a `driver_ids` flag to pass a comma-separated list of driver IDs string to specify the drivers to replay. Extracts the number of drivers by listing the number of sub-directors under the trace directory if `driver_ids` is empty. Parts of https://github.com/facebookincubator/velox/issues/9668 Pull Request resolved: https://github.com/facebookincubator/velox/pull/11560 Reviewed By: oerling Differential Revision: D66438878 Pulled By: xiaoxmeng fbshipit-source-id: be5db2607578965b138aaf7b317f37589999d597 --- velox/core/PlanNode.h | 7 ++ velox/exec/OperatorTraceScan.cpp | 2 +- velox/exec/TraceUtil.cpp | 15 +-- velox/exec/TraceUtil.h | 11 +-- velox/exec/tests/TraceUtilTest.cpp | 6 +- .../tests/utils/HiveConnectorTestBase.cpp | 8 +- velox/exec/tests/utils/PlanBuilder.cpp | 7 +- velox/exec/tests/utils/PlanBuilder.h | 4 + velox/tool/trace/AggregationReplayer.h | 11 ++- velox/tool/trace/FilterProjectReplayer.h | 11 ++- velox/tool/trace/HashJoinReplayer.cpp | 1 + velox/tool/trace/HashJoinReplayer.h | 11 ++- velox/tool/trace/OperatorReplayerBase.cpp | 16 +-- velox/tool/trace/OperatorReplayerBase.h | 6 +- .../tool/trace/PartitionedOutputReplayer.cpp | 11 ++- velox/tool/trace/PartitionedOutputReplayer.h | 1 + velox/tool/trace/TableScanReplayer.cpp | 11 +-- velox/tool/trace/TableScanReplayer.h | 13 ++- velox/tool/trace/TableWriterReplayer.h | 9 +- velox/tool/trace/TraceReplayRunner.cpp | 18 ++-- velox/tool/trace/TraceReplayRunner.h | 1 + .../trace/tests/AggregationReplayerTest.cpp | 3 +- .../trace/tests/FilterProjectReplayerTest.cpp | 27 ++++- .../tool/trace/tests/HashJoinReplayerTest.cpp | 99 +++++++++++++++++-- .../tests/PartitionedOutputReplayerTest.cpp | 4 +- .../trace/tests/TableScanReplayerTest.cpp | 27 ++++- .../trace/tests/TableWriterReplayerTest.cpp | 2 + 27 files changed, 267 insertions(+), 75 deletions(-) diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index b4449989b8d5..6f910aab1a42 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -321,10 +321,12 @@ class TraceScanNode final : public PlanNode { const PlanNodeId& id, const std::string& traceDir, uint32_t pipelineId, + std::vector driverIds, const RowTypePtr& outputType) : PlanNode(id), traceDir_(traceDir), pipelineId_(pipelineId), + driverIds_(std::move(driverIds)), outputType_(outputType) {} const RowTypePtr& outputType() const override { @@ -348,12 +350,17 @@ class TraceScanNode final : public PlanNode { return pipelineId_; } + std::vector driverIds() const { + return driverIds_; + } + private: void addDetails(std::stringstream& stream) const override; // Directory of traced data, which is $traceRoot/$taskId/$nodeId. const std::string traceDir_; const uint32_t pipelineId_; + const std::vector driverIds_; const RowTypePtr outputType_; }; diff --git a/velox/exec/OperatorTraceScan.cpp b/velox/exec/OperatorTraceScan.cpp index a976367afa74..965715dcc5f8 100644 --- a/velox/exec/OperatorTraceScan.cpp +++ b/velox/exec/OperatorTraceScan.cpp @@ -34,7 +34,7 @@ OperatorTraceScan::OperatorTraceScan( getOpTraceDirectory( traceScanNode->traceDir(), traceScanNode->pipelineId(), - driverCtx->driverId), + traceScanNode->driverIds().at(driverCtx->driverId)), traceScanNode->outputType(), memory::MemoryManager::getInstance()->tracePool()); } diff --git a/velox/exec/TraceUtil.cpp b/velox/exec/TraceUtil.cpp index 79ebe95e704d..a49158b63430 100644 --- a/velox/exec/TraceUtil.cpp +++ b/velox/exec/TraceUtil.cpp @@ -18,6 +18,7 @@ #include +#include #include "velox/common/base/Exceptions.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" @@ -100,7 +101,7 @@ std::string getOpTraceDirectory( std::string getOpTraceDirectory( const std::string& nodeTraceDir, uint32_t pipelineId, - int driverId) { + uint32_t driverId) { return fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, driverId); } @@ -232,11 +233,13 @@ std::vector listDriverIds( return driverIds; } -size_t getNumDrivers( - const std::string& nodeTraceDir, - uint32_t pipelineId, - const std::shared_ptr& fs) { - return listDriverIds(nodeTraceDir, pipelineId, fs).size(); +std::vector extractDriverIds(const std::string& driverIds) { + std::vector driverIdList; + if (driverIds.empty()) { + return driverIdList; + } + folly::split(",", driverIds, driverIdList); + return driverIdList; } bool canTrace(const std::string& operatorType) { diff --git a/velox/exec/TraceUtil.h b/velox/exec/TraceUtil.h index a0fa3cd03ccd..57425838c541 100644 --- a/velox/exec/TraceUtil.h +++ b/velox/exec/TraceUtil.h @@ -68,7 +68,7 @@ std::string getOpTraceDirectory( std::string getOpTraceDirectory( const std::string& nodeTraceDir, uint32_t pipelineId, - int driverId); + uint32_t driverId); /// Returns the file path for a given operator's traced input file. std::string getOpTraceInputFilePath(const std::string& opTraceDir); @@ -122,13 +122,8 @@ std::vector listDriverIds( uint32_t pipelineId, const std::shared_ptr& fs); -/// Extracts the number of drivers by listing the number of sub-directors under -/// the trace directory for a given pipeline. 'nodeTraceDir' is the trace -/// directory of the plan node. -size_t getNumDrivers( - const std::string& nodeTraceDir, - uint32_t pipelineId, - const std::shared_ptr& fs); +/// Extracts the driver IDs from the comma-separated list of driver IDs string. +std::vector extractDriverIds(const std::string& driverIds); /// Extracts task ids of the query tracing by listing the query trace directory. /// 'traceDir' is the root trace directory. 'queryId' is the query id. diff --git a/velox/exec/tests/TraceUtilTest.cpp b/velox/exec/tests/TraceUtilTest.cpp index f65d0fcf1bf7..c7bfe30e2215 100644 --- a/velox/exec/tests/TraceUtilTest.cpp +++ b/velox/exec/tests/TraceUtilTest.cpp @@ -196,7 +196,6 @@ TEST_F(TraceUtilTest, getDriverIds) { fs->mkdir(nodeTraceDir); const uint32_t pipelineId = 1; fs->mkdir(trace::getPipelineTraceDirectory(nodeTraceDir, pipelineId)); - ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 0); ASSERT_TRUE(listDriverIds(nodeTraceDir, pipelineId, fs).empty()); // create 3 drivers. const uint32_t driverId1 = 1; @@ -205,7 +204,6 @@ TEST_F(TraceUtilTest, getDriverIds) { fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId2)); const uint32_t driverId3 = 3; fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId3)); - ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 3); auto driverIds = listDriverIds(nodeTraceDir, pipelineId, fs); ASSERT_EQ(driverIds.size(), 3); std::sort(driverIds.begin(), driverIds.end()); @@ -215,7 +213,9 @@ TEST_F(TraceUtilTest, getDriverIds) { // Bad driver id. const std::string BadDriverId = "badDriverId"; fs->mkdir(fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, BadDriverId)); - ASSERT_ANY_THROW(getNumDrivers(nodeTraceDir, pipelineId, fs)); ASSERT_ANY_THROW(listDriverIds(nodeTraceDir, pipelineId, fs)); + ASSERT_EQ(std::vector({1, 2, 4}), extractDriverIds("1,2,4")); + ASSERT_TRUE(extractDriverIds("").empty()); + ASSERT_NE(std::vector({1, 2}), extractDriverIds("1,2,4")); } } // namespace facebook::velox::exec::trace::test diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index da4ae409568b..28bf15bc314a 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -109,9 +109,13 @@ void HiveConnectorTestBase::writeToFile( velox::dwrf::WriterOptions options; options.config = config; options.schema = schema; - auto localWriteFile = std::make_unique(filePath, true, false); + auto fs = filesystems::getFileSystem(filePath, {}); + auto writeFile = fs->openFileForWrite( + filePath, + {.shouldCreateParentDirectories = true, + .shouldThrowOnFileAlreadyExists = false}); auto sink = std::make_unique( - std::move(localWriteFile), filePath); + std::move(writeFile), filePath); auto childPool = rootPool_->addAggregateChild("HiveConnectorTestBase.Writer"); options.memoryPool = childPool.get(); options.flushPolicyFactory = flushPolicyFactory; diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index e95ea01f101e..b17547e9554b 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -240,9 +240,14 @@ PlanBuilder& PlanBuilder::values( PlanBuilder& PlanBuilder::traceScan( const std::string& traceNodeDir, uint32_t pipelineId, + std::vector driverIds, const RowTypePtr& outputType) { planNode_ = std::make_shared( - nextPlanNodeId(), traceNodeDir, pipelineId, outputType); + nextPlanNodeId(), + traceNodeDir, + pipelineId, + std::move(driverIds), + outputType); return *this; } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 5ccf3a216775..bcd152a37de1 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -323,10 +323,14 @@ class PlanBuilder { /// @param traceNodeDir The trace directory for a given plan node. /// @param pipelineId The pipeline id for the traced operator instantiated /// from the given plan node. + /// @param driverIds The target driver ID list for replay. The replaying + /// operator uses its driver instance id as the list index to get the traced + /// driver id for replay. /// @param outputType The type of the tracing data. PlanBuilder& traceScan( const std::string& traceNodeDir, uint32_t pipelineId, + std::vector driverIds, const RowTypePtr& outputType); /// Add an ExchangeNode. diff --git a/velox/tool/trace/AggregationReplayer.h b/velox/tool/trace/AggregationReplayer.h index 41338eb136a0..ccaeec143969 100644 --- a/velox/tool/trace/AggregationReplayer.h +++ b/velox/tool/trace/AggregationReplayer.h @@ -28,8 +28,15 @@ class AggregationReplayer : public OperatorReplayerBase { const std::string& queryId, const std::string& taskId, const std::string& nodeId, - const std::string& operatorType) - : OperatorReplayerBase(traceDir, queryId, taskId, nodeId, operatorType) {} + const std::string& operatorType, + const std::string& driverIds) + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + operatorType, + driverIds) {} private: core::PlanNodePtr createPlanNode( diff --git a/velox/tool/trace/FilterProjectReplayer.h b/velox/tool/trace/FilterProjectReplayer.h index 00c1a344a766..8e1bd9af0a10 100644 --- a/velox/tool/trace/FilterProjectReplayer.h +++ b/velox/tool/trace/FilterProjectReplayer.h @@ -33,8 +33,15 @@ class FilterProjectReplayer : public OperatorReplayerBase { const std::string& queryId, const std::string& taskId, const std::string& nodeId, - const std::string& operatorType) - : OperatorReplayerBase(rootDir, queryId, taskId, nodeId, operatorType) {} + const std::string& operatorType, + const std::string& driverIds) + : OperatorReplayerBase( + rootDir, + queryId, + taskId, + nodeId, + operatorType, + driverIds) {} private: // Create either a standalone FilterNode, a standalone ProjectNode, or a diff --git a/velox/tool/trace/HashJoinReplayer.cpp b/velox/tool/trace/HashJoinReplayer.cpp index fc8fd3efce16..c39a7f611125 100644 --- a/velox/tool/trace/HashJoinReplayer.cpp +++ b/velox/tool/trace/HashJoinReplayer.cpp @@ -40,6 +40,7 @@ core::PlanNodePtr HashJoinReplayer::createPlanNode( .traceScan( nodeTraceDir_, pipelineIds_.at(1), // Build side + driverIds_, exec::trace::getDataType(planFragment_, nodeId_, 1)) .planNode(), hashJoinNode->outputType()); diff --git a/velox/tool/trace/HashJoinReplayer.h b/velox/tool/trace/HashJoinReplayer.h index 178ed952b173..c8fe9b2c8f9e 100644 --- a/velox/tool/trace/HashJoinReplayer.h +++ b/velox/tool/trace/HashJoinReplayer.h @@ -28,8 +28,15 @@ class HashJoinReplayer final : public OperatorReplayerBase { const std::string& queryId, const std::string& taskId, const std::string& nodeId, - const std::string& operatorType) - : OperatorReplayerBase(rootDir, queryId, taskId, nodeId, operatorType) {} + const std::string& operatorType, + const std::string& driverIds) + : OperatorReplayerBase( + rootDir, + queryId, + taskId, + nodeId, + operatorType, + driverIds) {} private: core::PlanNodePtr createPlanNode( diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index e73ed36ca572..1102b5cde4b6 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -33,7 +33,8 @@ OperatorReplayerBase::OperatorReplayerBase( std::string queryId, std::string taskId, std::string nodeId, - std::string operatorType) + std::string operatorType, + const std::string& driverIds) : queryId_(std::string(std::move(queryId))), taskId_(std::move(taskId)), nodeId_(std::move(nodeId)), @@ -43,10 +44,12 @@ OperatorReplayerBase::OperatorReplayerBase( nodeTraceDir_(exec::trace::getNodeTraceDirectory(taskTraceDir_, nodeId_)), fs_(filesystems::getFileSystem(taskTraceDir_, nullptr)), pipelineIds_(exec::trace::listPipelineIds(nodeTraceDir_, fs_)), - maxDrivers_(exec::trace::getNumDrivers( - nodeTraceDir_, - pipelineIds_.front(), - fs_)) { + driverIds_( + driverIds.empty() ? exec::trace::listDriverIds( + nodeTraceDir_, + pipelineIds_.front(), + fs_) + : exec::trace::extractDriverIds(driverIds)) { VELOX_USER_CHECK(!taskTraceDir_.empty()); VELOX_USER_CHECK(!taskId_.empty()); VELOX_USER_CHECK(!nodeId_.empty()); @@ -66,7 +69,7 @@ OperatorReplayerBase::OperatorReplayerBase( RowVectorPtr OperatorReplayerBase::run() { const auto restoredPlanNode = createPlan(); return exec::test::AssertQueryBuilder(restoredPlanNode) - .maxDrivers(maxDrivers_) + .maxDrivers(driverIds_.size()) .configs(queryConfigs_) .connectorSessionProperties(connectorConfigs_) .copyResults(memory::MemoryManager::getInstance()->tracePool()); @@ -87,6 +90,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { .traceScan( nodeTraceDir_, pipelineIds_.front(), + driverIds_, exec::trace::getDataType(planFragment_, nodeId_)) .addNode(replayNodeFactory(replayNode)) .planNode(); diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 97ba5d682203..692ea7b82595 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -32,7 +32,8 @@ class OperatorReplayerBase { std::string queryId, std::string taskId, std::string nodeId, - std::string operatorType); + std::string operatorType, + const std::string& driverIds); virtual ~OperatorReplayerBase() = default; OperatorReplayerBase(const OperatorReplayerBase& other) = delete; @@ -59,8 +60,7 @@ class OperatorReplayerBase { const std::string nodeTraceDir_; const std::shared_ptr fs_; const std::vector pipelineIds_; - const uint32_t maxDrivers_; - + const std::vector driverIds_; const std::shared_ptr planNodeIdGenerator_{ std::make_shared()}; diff --git a/velox/tool/trace/PartitionedOutputReplayer.cpp b/velox/tool/trace/PartitionedOutputReplayer.cpp index 80a19f670f4f..d232b696d964 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.cpp +++ b/velox/tool/trace/PartitionedOutputReplayer.cpp @@ -111,8 +111,15 @@ PartitionedOutputReplayer::PartitionedOutputReplayer( const std::string& nodeId, VectorSerde::Kind serdeKind, const std::string& operatorType, + const std::string& driverIds, const ConsumerCallBack& consumerCb) - : OperatorReplayerBase(traceDir, queryId, taskId, nodeId, operatorType), + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + operatorType, + driverIds), originalNode_(dynamic_cast( core::PlanNode::findFirstNode( planFragment_.get(), @@ -134,7 +141,7 @@ RowVectorPtr PartitionedOutputReplayer::run() { 0, createQueryContext(queryConfigs_, executor_.get()), Task::ExecutionMode::kParallel); - task->start(maxDrivers_); + task->start(driverIds_.size()); consumeAllData( bufferManager_, diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index 873bb185addc..0855cbe483d9 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -47,6 +47,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { const std::string& nodeId, VectorSerde::Kind serdeKind, const std::string& operatorType, + const std::string& driverIds, const ConsumerCallBack& consumerCb = [](auto partition, auto page) {}); RowVectorPtr run() override; diff --git a/velox/tool/trace/TableScanReplayer.cpp b/velox/tool/trace/TableScanReplayer.cpp index de4aae54f061..4c591cfbf0b4 100644 --- a/velox/tool/trace/TableScanReplayer.cpp +++ b/velox/tool/trace/TableScanReplayer.cpp @@ -30,7 +30,7 @@ namespace facebook::velox::tool::trace { RowVectorPtr TableScanReplayer::run() { const auto plan = createPlan(); return exec::test::AssertQueryBuilder(plan) - .maxDrivers(maxDrivers_) + .maxDrivers(driverIds_.size()) .configs(queryConfigs_) .connectorSessionProperties(connectorConfigs_) .splits(getSplits()) @@ -52,14 +52,9 @@ core::PlanNodePtr TableScanReplayer::createPlanNode( std::vector TableScanReplayer::getSplits() const { std::vector splitInfoDirs; - if (driverId_ != -1) { + for (const auto driverId : driverIds_) { splitInfoDirs.push_back(exec::trace::getOpTraceDirectory( - nodeTraceDir_, pipelineIds_.front(), driverId_)); - } else { - for (auto i = 0; i < maxDrivers_; ++i) { - splitInfoDirs.push_back(exec::trace::getOpTraceDirectory( - nodeTraceDir_, pipelineIds_.front(), i)); - } + nodeTraceDir_, pipelineIds_.front(), driverId)); } const auto splitStrs = exec::trace::OperatorTraceSplitReader( diff --git a/velox/tool/trace/TableScanReplayer.h b/velox/tool/trace/TableScanReplayer.h index e7afa3b4b33d..93a3a22a7d9e 100644 --- a/velox/tool/trace/TableScanReplayer.h +++ b/velox/tool/trace/TableScanReplayer.h @@ -33,9 +33,14 @@ class TableScanReplayer final : public OperatorReplayerBase { const std::string& taskId, const std::string& nodeId, const std::string& operatorType, - const int32_t driverId = -1) - : OperatorReplayerBase(traceDir, queryId, taskId, nodeId, operatorType), - driverId_(driverId) {} + const std::string& driverIds) + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + operatorType, + driverIds) {} RowVectorPtr run() override; @@ -46,8 +51,6 @@ class TableScanReplayer final : public OperatorReplayerBase { const core::PlanNodePtr& /*source*/) const override; std::vector getSplits() const; - - const int32_t driverId_; }; } // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TableWriterReplayer.h b/velox/tool/trace/TableWriterReplayer.h index ac67f8d6cc55..d74e3f66430e 100644 --- a/velox/tool/trace/TableWriterReplayer.h +++ b/velox/tool/trace/TableWriterReplayer.h @@ -32,8 +32,15 @@ class TableWriterReplayer final : public OperatorReplayerBase { const std::string& taskId, const std::string& nodeId, const std::string& operatorType, + const std::string& driverIds, const std::string& replayOutputDir) - : OperatorReplayerBase(traceDir, queryId, taskId, nodeId, operatorType), + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + operatorType, + driverIds), replayOutputDir_(replayOutputDir) { VELOX_CHECK(!replayOutputDir_.empty()); } diff --git a/velox/tool/trace/TraceReplayRunner.cpp b/velox/tool/trace/TraceReplayRunner.cpp index edc5a1f08612..83e48ef6a7ac 100644 --- a/velox/tool/trace/TraceReplayRunner.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -70,7 +70,7 @@ DEFINE_string( "Specify the target task id, if empty, show the summary of all the traced " "query task."); DEFINE_string(node_id, "", "Specify the target node id."); -DEFINE_int32(driver_id, -1, "Specify the target driver id."); +DEFINE_string(driver_ids, "", "A comma-separated list of target driver ids"); DEFINE_string( table_writer_output_dir, "", @@ -298,6 +298,7 @@ TraceReplayRunner::createReplayer() const { FLAGS_task_id, FLAGS_node_id, traceNodeName, + FLAGS_driver_ids, FLAGS_table_writer_output_dir); } else if (traceNodeName == "Aggregation") { replayer = std::make_unique( @@ -305,7 +306,8 @@ TraceReplayRunner::createReplayer() const { FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, - traceNodeName); + traceNodeName, + FLAGS_driver_ids); } else if (traceNodeName == "PartitionedOutput") { replayer = std::make_unique( FLAGS_root_dir, @@ -313,28 +315,32 @@ TraceReplayRunner::createReplayer() const { FLAGS_task_id, FLAGS_node_id, getVectorSerdeKind(), - traceNodeName); + traceNodeName, + FLAGS_driver_ids); } else if (traceNodeName == "TableScan") { replayer = std::make_unique( FLAGS_root_dir, FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, - traceNodeName); + traceNodeName, + FLAGS_driver_ids); } else if (traceNodeName == "Filter" || traceNodeName == "Project") { replayer = std::make_unique( FLAGS_root_dir, FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, - traceNodeName); + traceNodeName, + FLAGS_driver_ids); } else if (traceNodeName == "HashJoin") { replayer = std::make_unique( FLAGS_root_dir, FLAGS_query_id, FLAGS_task_id, FLAGS_node_id, - traceNodeName); + traceNodeName, + FLAGS_driver_ids); } else { VELOX_UNSUPPORTED("Unsupported operator type: {}", traceNodeName); } diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h index be46dce7c00c..fa2bef827874 100644 --- a/velox/tool/trace/TraceReplayRunner.h +++ b/velox/tool/trace/TraceReplayRunner.h @@ -29,6 +29,7 @@ DECLARE_string(query_id); DECLARE_string(task_id); DECLARE_string(node_id); DECLARE_int32(driver_id); +DECLARE_string(driver_ids); DECLARE_string(table_writer_output_dir); DECLARE_double(hiveConnectorExecutorHwMultiplier); DECLARE_int32(shuffle_serialization_format); diff --git a/velox/tool/trace/tests/AggregationReplayerTest.cpp b/velox/tool/trace/tests/AggregationReplayerTest.cpp index f31dbf0628c3..919e86a74340 100644 --- a/velox/tool/trace/tests/AggregationReplayerTest.cpp +++ b/velox/tool/trace/tests/AggregationReplayerTest.cpp @@ -203,7 +203,8 @@ TEST_F(AggregationReplayerTest, test) { task->queryCtx()->queryId(), task->taskId(), traceNodeId_, - "Aggregation") + "Aggregation", + "") .run(); assertEqualResults({results}, {replayingResult}); } diff --git a/velox/tool/trace/tests/FilterProjectReplayerTest.cpp b/velox/tool/trace/tests/FilterProjectReplayerTest.cpp index 34909cc9bfdc..791889704a21 100644 --- a/velox/tool/trace/tests/FilterProjectReplayerTest.cpp +++ b/velox/tool/trace/tests/FilterProjectReplayerTest.cpp @@ -192,7 +192,8 @@ TEST_F(FilterProjectReplayerTest, filterProject) { task->queryCtx()->queryId(), task->taskId(), projectNodeId_, - "FilterProject") + "FilterProject", + "") .run(); assertEqualResults({result}, {replayingResult}); } @@ -226,7 +227,8 @@ TEST_F(FilterProjectReplayerTest, filterOnly) { task->queryCtx()->queryId(), task->taskId(), filterNodeId_, - "FilterProject") + "FilterProject", + "") .run(); assertEqualResults({result}, {replayingResult}); } @@ -259,8 +261,27 @@ TEST_F(FilterProjectReplayerTest, projectOnly) { task->queryCtx()->queryId(), task->taskId(), projectNodeId_, - "FilterProject") + "FilterProject", + "") .run(); assertEqualResults({result}, {replayingResult}); + + auto replayingResult1 = FilterProjectReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + projectNodeId_, + "FilterProject", + "0,2") + .run(); + auto replayingResult2 = FilterProjectReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + projectNodeId_, + "FilterProject", + "1,3") + .run(); + assertEqualResults({result}, {replayingResult1, replayingResult2}); } } // namespace facebook::velox::tool::trace::test diff --git a/velox/tool/trace/tests/HashJoinReplayerTest.cpp b/velox/tool/trace/tests/HashJoinReplayerTest.cpp index c0a6f3b763c4..53567d48f027 100644 --- a/velox/tool/trace/tests/HashJoinReplayerTest.cpp +++ b/velox/tool/trace/tests/HashJoinReplayerTest.cpp @@ -22,10 +22,13 @@ #include #include "velox/common/file/FileSystems.h" +#include "velox/common/file/Utils.h" +#include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/common/hyperloglog/SparseHll.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/PartitionFunction.h" +#include "velox/exec/PlanNodeStats.h" #include "velox/exec/TableWriter.h" #include "velox/exec/TraceUtil.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" @@ -35,10 +38,6 @@ #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/tool/trace/HashJoinReplayer.h" - -#include "velox/common/file/Utils.h" -#include "velox/exec/PlanNodeStats.h" - #include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox; @@ -51,6 +50,7 @@ using namespace facebook::velox::connector::hive; using namespace facebook::velox::dwio::common; using namespace facebook::velox::common::testutil; using namespace facebook::velox::common::hll; +using namespace facebook::velox::tests::utils; namespace facebook::velox::tool::trace::test { class HashJoinReplayerTest : public HiveConnectorTestBase { @@ -58,7 +58,7 @@ class HashJoinReplayerTest : public HiveConnectorTestBase { static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); HiveConnectorTestBase::SetUpTestCase(); - filesystems::registerLocalFileSystem(); + registerFaultyFileSystem(); if (!isRegisteredVectorSerde()) { serializer::presto::PrestoVectorSerde::registerVectorSerde(); } @@ -214,7 +214,8 @@ TEST_F(HashJoinReplayerTest, basic) { probeInput_, buildInput_); AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); - traceBuilder.config(core::QueryConfig::kQueryTraceEnabled, true) + traceBuilder.maxDrivers(4) + .config(core::QueryConfig::kQueryTraceEnabled, true) .config(core::QueryConfig::kQueryTraceDir, traceRoot) .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") @@ -232,11 +233,89 @@ TEST_F(HashJoinReplayerTest, basic) { task->queryCtx()->queryId(), task->taskId(), traceNodeId_, - "HashJoin") + "HashJoin", + "") .run(); assertEqualResults({result}, {replayingResult}); } +TEST_F(HashJoinReplayerTest, partialDriverIds) { + const std::shared_ptr testDir = + TempDirectoryPath::create(true); + const std::string tableDir = + fmt::format("{}/{}", testDir->getPath(), "table"); + const auto planWithSplits = createPlan( + tableDir, + core::JoinType::kInner, + probeKeys_, + buildKeys_, + probeInput_, + buildInput_); + AssertQueryBuilder builder(planWithSplits.plan); + for (const auto& [planNodeId, nodeSplits] : planWithSplits.splits) { + builder.splits(planNodeId, nodeSplits); + } + const auto result = builder.copyResults(pool()); + + const auto traceRoot = + fmt::format("{}/{}/traceRoot/", testDir->getPath(), "basic"); + std::shared_ptr task; + auto tracePlanWithSplits = createPlan( + tableDir, + core::JoinType::kInner, + probeKeys_, + buildKeys_, + probeInput_, + buildInput_); + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.maxDrivers(4) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_); + for (const auto& [planNodeId, nodeSplits] : tracePlanWithSplits.splits) { + traceBuilder.splits(planNodeId, nodeSplits); + } + auto traceResult = traceBuilder.copyResults(pool(), task); + + assertEqualResults({result}, {traceResult}); + + const auto taskId = task->taskId(); + const auto taskTraceDir = + exec::trace::getTaskTraceDirectory(traceRoot, *task); + const auto opTraceDir = + exec::trace::getOpTraceDirectory(taskTraceDir, traceNodeId_, 0, 0); + const auto opTraceDataFile = exec::trace::getOpTraceInputFilePath(opTraceDir); + auto faultyFs = faultyFileSystem(); + faultyFs->setFileInjectionHook([&](FaultFileOperation* op) { + if (op->type == FaultFileOperation::Type::kRead && + op->path == opTraceDataFile) { + VELOX_FAIL("Read wrong data file {}", opTraceDataFile); + } + }); + + VELOX_ASSERT_THROW( + HashJoinReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + "HashJoin", + "0") + .run(), + "Read wrong data file"); + HashJoinReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + "HashJoin", + "1,3") + .run(); + faultyFs->clearFileFaultInjections(); +} + DEBUG_ONLY_TEST_F(HashJoinReplayerTest, hashBuildSpill) { const auto planWithSplits = createPlan( tableDir_, @@ -307,7 +386,8 @@ DEBUG_ONLY_TEST_F(HashJoinReplayerTest, hashBuildSpill) { task->queryCtx()->queryId(), task->taskId(), traceNodeId_, - "HashJoin") + "HashJoin", + "") .run(); assertEqualResults({result}, {replayingResult}); } @@ -386,7 +466,8 @@ DEBUG_ONLY_TEST_F(HashJoinReplayerTest, hashProbeSpill) { task->queryCtx()->queryId(), task->taskId(), traceNodeId_, - "HashJoin") + "HashJoin", + "") .run(); assertEqualResults({result}, {replayingResult}); } diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index c9fd1498eef7..28b821cadea3 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -159,7 +159,8 @@ TEST_P(PartitionedOutputReplayerTest, defaultConsumer) { originalTask->taskId(), planNodeId, GetParam(), - "PartitionedOutput") + "PartitionedOutput", + "") .run()); } @@ -251,6 +252,7 @@ TEST_P(PartitionedOutputReplayerTest, basic) { planNodeId, GetParam(), "PartitionedOutput", + "", [&](auto partition, auto page) { replayedPartitionedResults[partition].push_back(std::move(page)); }) diff --git a/velox/tool/trace/tests/TableScanReplayerTest.cpp b/velox/tool/trace/tests/TableScanReplayerTest.cpp index 248bfb78d18c..22b8c66d221c 100644 --- a/velox/tool/trace/tests/TableScanReplayerTest.cpp +++ b/velox/tool/trace/tests/TableScanReplayerTest.cpp @@ -137,9 +137,28 @@ TEST_F(TableScanReplayerTest, basic) { task->queryCtx()->queryId(), task->taskId(), traceNodeId_, - "TableScan") + "TableScan", + "") .run(); assertEqualResults({results}, {replayingResult}); + + const auto replayingResult1 = TableScanReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + "TableScan", + "0,2") + .run(); + const auto replayingResult2 = TableScanReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + "TableScan", + "1,3") + .run(); + assertEqualResults({results}, {replayingResult1, replayingResult2}); } TEST_F(TableScanReplayerTest, columnPrunning) { @@ -181,7 +200,8 @@ TEST_F(TableScanReplayerTest, columnPrunning) { task->queryCtx()->queryId(), task->taskId(), traceNodeId_, - "TableScan") + "TableScan", + "") .run(); assertEqualResults({results}, {replayingResult}); } @@ -241,7 +261,8 @@ TEST_F(TableScanReplayerTest, subfieldPrunning) { task->queryCtx()->queryId(), task->taskId(), traceNodeId_, - "TableScan") + "TableScan", + "") .run(); assertEqualResults({results}, {replayingResult}); } diff --git a/velox/tool/trace/tests/TableWriterReplayerTest.cpp b/velox/tool/trace/tests/TableWriterReplayerTest.cpp index e7b57cae399e..759ec592e7dd 100644 --- a/velox/tool/trace/tests/TableWriterReplayerTest.cpp +++ b/velox/tool/trace/tests/TableWriterReplayerTest.cpp @@ -301,6 +301,7 @@ TEST_F(TableWriterReplayerTest, basic) { task->taskId(), "1", "TableWriter", + "", traceOutputDir->getPath()) .run(); @@ -426,6 +427,7 @@ TEST_F(TableWriterReplayerTest, partitionWrite) { task->taskId(), tableWriteNodeId, "TableWriter", + "", traceOutputDir->getPath()) .run(); actualPartitionDirectories = getLeafSubdirectories(traceOutputDir->getPath());