Skip to content

Commit

Permalink
Add clean shutdown for ExchangeFuzzer runs (#11191)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11191

After the ExchangeFuzzer runs all the Tasks need to shutdown, and as part of
shutting down they have a future to abort any Tasks still running as they should all
terminate as a unit.  There's a chance not all Tasks have completely finished by the
time the function terminates, i.e. this future hasn't run yet. This can lead to TSAN
issues as there's a race between when the Tasks are deleted and these futures. To
ensure everything happens in the right order, I've added a clean shutdown step at the
end of ExchangeFuzzer runs just to make sure that all the Tasks have finished.

Reviewed By: xiaoxmeng

Differential Revision: D64010841

fbshipit-source-id: cac51eab3d37aea55ab4a1c19c6781005ecc9b8a
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Oct 10, 2024
1 parent 900e61c commit 7356542
Showing 1 changed file with 36 additions and 0 deletions.
36 changes: 36 additions & 0 deletions velox/exec/tests/ExchangeFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,42 @@ class ExchangeFuzzer : public VectorTestBase {
saveRepro(vectors, params);
return false;
}

for (const auto& task : tasks) {
if (!waitForTaskCompletion(task.get(), 1'000'000)) {
if (task->state() == TaskState::kFailed) {
try {
std::rethrow_exception(task->error());
} catch (const std::exception& taskException) {
// This must be an unexpected exception.
LOG(ERROR) << "Task " << task->toString() << " failed with error "
<< taskException.what();
saveRepro(vectors, params);
return false;
}
} else if (task->state() == TaskState::kRunning) {
VELOX_FAIL(
"Timed out waiting for task to complete, task: {} {}",
task->toString(),
taskStateString(task->state()));
} else if (task->state() != TaskState::kFinished) {
VELOX_FAIL(
"Task {} ended in unexpected state {}",
task->toString(),
taskStateString(task->state()));
} else if (task->numFinishedDrivers() != task->numTotalDrivers()) {
VELOX_FAIL(
"Task {} finished but it has {} drivers still running",
task->toString(),
task->numTotalDrivers() - task->numFinishedDrivers());
}
// There's a chance that the task exceeded the timeout to finish but
// managed to finish in the gap before these if statements got executed,
// in this case just let it slide since we ended up in the right place
// eventually.
}
}

return true;
}

Expand Down

0 comments on commit 7356542

Please sign in to comment.