Skip to content

Commit

Permalink
Fix flakey FilterProjectReplayerTest (#11436)
Browse files Browse the repository at this point in the history
Summary:
In FilterProjectReplayerTest, some drivers may not be assigned
splits during execution, the splits are consumed by other drivers,
hence them would create an empty tracing data file causing the
replayed crashed as the `OperatorInputTracerReader` throws if
the trace input data file is empty.

This PR adds a check in `OperatorInputTracerReader` before creating
input stream, and ignore the empty data file reading.

Resolve #11430

Pull Request resolved: #11436

Reviewed By: bikramSingh91

Differential Revision: D65484118

Pulled By: xiaoxmeng

fbshipit-source-id: a0cd5f59d6320e87f56555021c3ba489d03d927f
  • Loading branch information
duanmeng authored and facebook-github-bot committed Nov 5, 2024
1 parent f9b24d5 commit a5ae228
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 32 deletions.
10 changes: 9 additions & 1 deletion velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ OperatorTraceInputReader::OperatorTraceInputReader(
pool_(pool),
inputStream_(getInputStream()) {
VELOX_CHECK_NOT_NULL(dataType_);
VELOX_CHECK_NOT_NULL(inputStream_);
}

bool OperatorTraceInputReader::read(RowVectorPtr& batch) const {
if (inputStream_ == nullptr) {
return false;
}

if (inputStream_->atEnd()) {
batch = nullptr;
return false;
Expand All @@ -49,6 +52,11 @@ bool OperatorTraceInputReader::read(RowVectorPtr& batch) const {
std::unique_ptr<common::FileInputStream>
OperatorTraceInputReader::getInputStream() const {
auto traceFile = fs_->openFileForRead(getOpTraceInputFilePath(traceDir_));
if (traceFile->size() == 0) {
LOG(WARNING) << "Operator trace input file is empty: "
<< getOpTraceInputFilePath(traceDir_);
return nullptr;
}
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(traceFile), 1 << 20, pool_);
Expand Down
6 changes: 3 additions & 3 deletions velox/tool/trace/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
add_executable(
velox_tool_trace_test
AggregationReplayerTest.cpp
PartitionedOutputReplayerTest.cpp
TableWriterReplayerTest.cpp
FilterProjectReplayerTest.cpp
HashJoinReplayerTest.cpp
FilterProjectReplayerTest.cpp)
PartitionedOutputReplayerTest.cpp
TableWriterReplayerTest.cpp)

add_test(
NAME velox_tool_trace_test
Expand Down
67 changes: 39 additions & 28 deletions velox/tool/trace/tests/FilterProjectReplayerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class FilterProjectReplayerTest : public HiveConnectorTestBase {
const std::string& path,
memory::MemoryPool* writerPool) {
std::vector<Split> splits;
for (auto i = 0; i < 8; ++i) {
for (auto i = 0; i < 4; ++i) {
const std::string filePath = fmt::format("{}/{}", path, i);
writeToFile(filePath, inputs);
splits.emplace_back(makeHiveConnectorSplit(filePath));
Expand Down Expand Up @@ -159,33 +159,44 @@ TEST_F(FilterProjectReplayerTest, filterProject) {
AssertQueryBuilder builder(planWithSplits.plan);
const auto result = builder.splits(planWithSplits.splits).copyResults(pool());

const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "basic");
const auto tracePlanWithSplits = createPlan(PlanMode::FilterProject);
std::shared_ptr<Task> task;
AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan);
traceBuilder.maxDrivers(2)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30)
.config(core::QueryConfig::kQueryTraceTaskRegExp, ".*")
.config(
core::QueryConfig::kQueryTraceNodeIds,
fmt::format("{},{}", filterNodeId_, projectNodeId_));
auto traceResult =
traceBuilder.splits(tracePlanWithSplits.splits).copyResults(pool(), task);
struct {
uint32_t maxDrivers;

assertEqualResults({result}, {traceResult});
std::string debugString() const {
return fmt::format("maxDrivers: {}", maxDrivers);
}
} testSettings[]{{1}, {4}, {8}};

const auto taskId = task->taskId();
auto replayingResult = FilterProjectReplayer(
traceRoot,
task->queryCtx()->queryId(),
task->taskId(),
projectNodeId_,
0,
"FilterProject")
.run();
assertEqualResults({result}, {replayingResult});
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "basic");
const auto tracePlanWithSplits = createPlan(PlanMode::FilterProject);
std::shared_ptr<Task> task;
AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan);
traceBuilder.maxDrivers(4)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30)
.config(core::QueryConfig::kQueryTraceTaskRegExp, ".*")
.config(
core::QueryConfig::kQueryTraceNodeIds,
fmt::format("{},{}", filterNodeId_, projectNodeId_));
auto traceResult = traceBuilder.splits(tracePlanWithSplits.splits)
.copyResults(pool(), task);

assertEqualResults({result}, {traceResult});

const auto taskId = task->taskId();
auto replayingResult = FilterProjectReplayer(
traceRoot,
task->queryCtx()->queryId(),
task->taskId(),
projectNodeId_,
0,
"FilterProject")
.run();
assertEqualResults({result}, {replayingResult});
}
}

TEST_F(FilterProjectReplayerTest, filterOnly) {
Expand All @@ -197,7 +208,7 @@ TEST_F(FilterProjectReplayerTest, filterOnly) {
const auto tracePlanWithSplits = createPlan(PlanMode::FilterOnly);
std::shared_ptr<Task> task;
AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan);
traceBuilder.maxDrivers(2)
traceBuilder.maxDrivers(4)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30)
Expand Down Expand Up @@ -231,7 +242,7 @@ TEST_F(FilterProjectReplayerTest, projectOnly) {
const auto tracePlanWithSplits = createPlan(PlanMode::ProjectOnly);
std::shared_ptr<Task> task;
AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan);
traceBuilder.maxDrivers(2)
traceBuilder.maxDrivers(4)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30)
Expand Down

0 comments on commit a5ae228

Please sign in to comment.