From a5ae2283211fc45b2ab2b79dc44c9007e46794d7 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Tue, 5 Nov 2024 11:20:05 -0800 Subject: [PATCH] Fix flakey FilterProjectReplayerTest (#11436) Summary: In FilterProjectReplayerTest, some drivers may not be assigned splits during execution, the splits are consumed by other drivers, hence them would create an empty tracing data file causing the replayed crashed as the `OperatorInputTracerReader` throws if the trace input data file is empty. This PR adds a check in `OperatorInputTracerReader` before creating input stream, and ignore the empty data file reading. Resolve https://github.com/facebookincubator/velox/issues/11430 Pull Request resolved: https://github.com/facebookincubator/velox/pull/11436 Reviewed By: bikramSingh91 Differential Revision: D65484118 Pulled By: xiaoxmeng fbshipit-source-id: a0cd5f59d6320e87f56555021c3ba489d03d927f --- velox/exec/OperatorTraceReader.cpp | 10 ++- velox/tool/trace/tests/CMakeLists.txt | 6 +- .../trace/tests/FilterProjectReplayerTest.cpp | 67 +++++++++++-------- 3 files changed, 51 insertions(+), 32 deletions(-) diff --git a/velox/exec/OperatorTraceReader.cpp b/velox/exec/OperatorTraceReader.cpp index 4477717b4042..90e68af715d2 100644 --- a/velox/exec/OperatorTraceReader.cpp +++ b/velox/exec/OperatorTraceReader.cpp @@ -32,10 +32,13 @@ OperatorTraceInputReader::OperatorTraceInputReader( pool_(pool), inputStream_(getInputStream()) { VELOX_CHECK_NOT_NULL(dataType_); - VELOX_CHECK_NOT_NULL(inputStream_); } bool OperatorTraceInputReader::read(RowVectorPtr& batch) const { + if (inputStream_ == nullptr) { + return false; + } + if (inputStream_->atEnd()) { batch = nullptr; return false; @@ -49,6 +52,11 @@ bool OperatorTraceInputReader::read(RowVectorPtr& batch) const { std::unique_ptr OperatorTraceInputReader::getInputStream() const { auto traceFile = fs_->openFileForRead(getOpTraceInputFilePath(traceDir_)); + if (traceFile->size() == 0) { + LOG(WARNING) << "Operator trace input file is empty: " + << getOpTraceInputFilePath(traceDir_); + return nullptr; + } // TODO: Make the buffer size configurable. return std::make_unique( std::move(traceFile), 1 << 20, pool_); diff --git a/velox/tool/trace/tests/CMakeLists.txt b/velox/tool/trace/tests/CMakeLists.txt index 9a5d37701271..fe2c66a5fd3c 100644 --- a/velox/tool/trace/tests/CMakeLists.txt +++ b/velox/tool/trace/tests/CMakeLists.txt @@ -15,10 +15,10 @@ add_executable( velox_tool_trace_test AggregationReplayerTest.cpp - PartitionedOutputReplayerTest.cpp - TableWriterReplayerTest.cpp + FilterProjectReplayerTest.cpp HashJoinReplayerTest.cpp - FilterProjectReplayerTest.cpp) + PartitionedOutputReplayerTest.cpp + TableWriterReplayerTest.cpp) add_test( NAME velox_tool_trace_test diff --git a/velox/tool/trace/tests/FilterProjectReplayerTest.cpp b/velox/tool/trace/tests/FilterProjectReplayerTest.cpp index 34938879a65d..55244e513988 100644 --- a/velox/tool/trace/tests/FilterProjectReplayerTest.cpp +++ b/velox/tool/trace/tests/FilterProjectReplayerTest.cpp @@ -100,7 +100,7 @@ class FilterProjectReplayerTest : public HiveConnectorTestBase { const std::string& path, memory::MemoryPool* writerPool) { std::vector splits; - for (auto i = 0; i < 8; ++i) { + for (auto i = 0; i < 4; ++i) { const std::string filePath = fmt::format("{}/{}", path, i); writeToFile(filePath, inputs); splits.emplace_back(makeHiveConnectorSplit(filePath)); @@ -159,33 +159,44 @@ TEST_F(FilterProjectReplayerTest, filterProject) { AssertQueryBuilder builder(planWithSplits.plan); const auto result = builder.splits(planWithSplits.splits).copyResults(pool()); - const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "basic"); - const auto tracePlanWithSplits = createPlan(PlanMode::FilterProject); - std::shared_ptr task; - AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); - traceBuilder.maxDrivers(2) - .config(core::QueryConfig::kQueryTraceEnabled, true) - .config(core::QueryConfig::kQueryTraceDir, traceRoot) - .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) - .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") - .config( - core::QueryConfig::kQueryTraceNodeIds, - fmt::format("{},{}", filterNodeId_, projectNodeId_)); - auto traceResult = - traceBuilder.splits(tracePlanWithSplits.splits).copyResults(pool(), task); + struct { + uint32_t maxDrivers; - assertEqualResults({result}, {traceResult}); + std::string debugString() const { + return fmt::format("maxDrivers: {}", maxDrivers); + } + } testSettings[]{{1}, {4}, {8}}; - const auto taskId = task->taskId(); - auto replayingResult = FilterProjectReplayer( - traceRoot, - task->queryCtx()->queryId(), - task->taskId(), - projectNodeId_, - 0, - "FilterProject") - .run(); - assertEqualResults({result}, {replayingResult}); + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "basic"); + const auto tracePlanWithSplits = createPlan(PlanMode::FilterProject); + std::shared_ptr task; + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.maxDrivers(4) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config( + core::QueryConfig::kQueryTraceNodeIds, + fmt::format("{},{}", filterNodeId_, projectNodeId_)); + auto traceResult = traceBuilder.splits(tracePlanWithSplits.splits) + .copyResults(pool(), task); + + assertEqualResults({result}, {traceResult}); + + const auto taskId = task->taskId(); + auto replayingResult = FilterProjectReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + projectNodeId_, + 0, + "FilterProject") + .run(); + assertEqualResults({result}, {replayingResult}); + } } TEST_F(FilterProjectReplayerTest, filterOnly) { @@ -197,7 +208,7 @@ TEST_F(FilterProjectReplayerTest, filterOnly) { const auto tracePlanWithSplits = createPlan(PlanMode::FilterOnly); std::shared_ptr task; AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); - traceBuilder.maxDrivers(2) + traceBuilder.maxDrivers(4) .config(core::QueryConfig::kQueryTraceEnabled, true) .config(core::QueryConfig::kQueryTraceDir, traceRoot) .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) @@ -231,7 +242,7 @@ TEST_F(FilterProjectReplayerTest, projectOnly) { const auto tracePlanWithSplits = createPlan(PlanMode::ProjectOnly); std::shared_ptr task; AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); - traceBuilder.maxDrivers(2) + traceBuilder.maxDrivers(4) .config(core::QueryConfig::kQueryTraceEnabled, true) .config(core::QueryConfig::kQueryTraceDir, traceRoot) .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30)