Skip to content

Commit

Permalink
feat(fuzzer): Support Join Filters in Join Fuzzer (#11473)
Browse files Browse the repository at this point in the history
Summary:

This changes adds the support for join filter 10% of the time. Currently it supports boolean and integer columns.

Reviewed By: kagamiori

Differential Revision: D65629460
  • Loading branch information
Daniel Hunte authored and facebook-github-bot committed Dec 10, 2024
1 parent d6bea9f commit 3ea4a51
Showing 1 changed file with 99 additions and 41 deletions.
140 changes: 99 additions & 41 deletions velox/exec/fuzzer/JoinFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,17 @@ class JoinFuzzer {
const std::vector<std::string>& buildKeys,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const std::vector<std::string>& outputColumns);
const std::vector<std::string>& outputColumns,
const std::string& filter);

JoinFuzzer::PlanWithSplits makeMergeJoinPlan(
core::JoinType joinType,
const std::vector<std::string>& probeKeys,
const std::vector<std::string>& buildKeys,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const std::vector<std::string>& outputColumns);
const std::vector<std::string>& outputColumns,
const std::string& filter);

// Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes.
// If withFilter is true, uses the equality filter between probeKeys and
Expand All @@ -162,7 +164,7 @@ class JoinFuzzer {
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const std::vector<std::string>& outputColumns,
bool withFilter = true);
const std::string& filter);

// Makes the default query plan with table scan as inputs for both probe and
// build sides.
Expand All @@ -175,7 +177,8 @@ class JoinFuzzer {
const std::vector<std::string>& buildKeys,
const std::vector<Split>& probeSplits,
const std::vector<Split>& buildSplits,
const std::vector<std::string>& outputColumns);
const std::vector<std::string>& outputColumns,
const std::string& filter);

JoinFuzzer::PlanWithSplits makeMergeJoinPlanWithTableScan(
core::JoinType joinType,
Expand All @@ -185,7 +188,8 @@ class JoinFuzzer {
const std::vector<std::string>& buildKeys,
const std::vector<Split>& probeSplits,
const std::vector<Split>& buildSplits,
const std::vector<std::string>& outputColumns);
const std::vector<std::string>& outputColumns,
const std::string& filter);

// Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan
// nodes. If withFilter is true, uses the equiality filter between probeKeys
Expand All @@ -199,13 +203,14 @@ class JoinFuzzer {
const std::vector<Split>& probeSplits,
const std::vector<Split>& buildSplits,
const std::vector<std::string>& outputColumns,
bool withFilter = true);
const std::string& filter);

void makeAlternativePlans(
const core::PlanNodePtr& plan,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
std::vector<JoinFuzzer::PlanWithSplits>& plans);
std::vector<JoinFuzzer::PlanWithSplits>& plans,
const std::string& filter);

// Makes the query plan from 'planWithTableScan' with grouped execution mode.
// Correspondingly, it replaces the table scan input splits with grouped ones.
Expand Down Expand Up @@ -249,7 +254,8 @@ class JoinFuzzer {
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const std::vector<std::string>& outputColumns,
std::vector<PlanWithSplits>& altPlans);
std::vector<PlanWithSplits>& altPlans,
const std::string& filter);

// Splits the input into groups by partitioning on the join keys.
std::vector<std::vector<RowVectorPtr>> splitInputByGroup(
Expand Down Expand Up @@ -597,6 +603,12 @@ std::optional<core::JoinType> tryFlipJoinType(core::JoinType joinType) {
// Returns a plan with flipped join sides of the input hash join node. If the
// join type doesn't allow flipping, returns a nullptr.
core::PlanNodePtr tryFlipJoinSides(const core::HashJoinNode& joinNode) {
// Null-aware right semi project join doesn't support filter.
if (joinNode.filter() &&
joinNode.joinType() == core::JoinType::kLeftSemiProject &&
joinNode.isNullAware()) {
return nullptr;
}
auto flippedJoinType = tryFlipJoinType(joinNode.joinType());
if (!flippedJoinType.has_value()) {
return nullptr;
Expand Down Expand Up @@ -688,7 +700,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan(
const std::vector<std::string>& buildKeys,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const std::vector<std::string>& outputColumns) {
const std::vector<std::string>& outputColumns,
const std::string& filter) {
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan =
PlanBuilder(planNodeIdGenerator)
Expand All @@ -697,7 +710,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan(
probeKeys,
buildKeys,
PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(),
/*filter=*/"",
filter,
outputColumns,
joinType,
nullAware)
Expand All @@ -714,7 +727,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan(
const std::vector<std::string>& buildKeys,
const std::vector<Split>& probeSplits,
const std::vector<Split>& buildSplits,
const std::vector<std::string>& outputColumns) {
const std::vector<std::string>& outputColumns,
const std::string& filter) {
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId probeScanId;
core::PlanNodeId buildScanId;
Expand All @@ -728,7 +742,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan(
.tableScan(buildType)
.capturePlanNodeId(buildScanId)
.planNode(),
/*filter=*/"",
filter,
outputColumns,
joinType,
nullAware)
Expand Down Expand Up @@ -819,7 +833,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan(
const std::vector<std::string>& buildKeys,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const std::vector<std::string>& outputColumns) {
const std::vector<std::string>& outputColumns,
const std::string& filter) {
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
return JoinFuzzer::PlanWithSplits{PlanBuilder(planNodeIdGenerator)
.values(probeInput)
Expand All @@ -831,7 +846,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan(
.values(buildInput)
.orderBy(buildKeys, false)
.planNode(),
/*filter=*/"",
filter,
outputColumns,
joinType)
.planNode()};
Expand All @@ -844,10 +859,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan(
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const std::vector<std::string>& outputColumns,
bool withFilter) {
const std::string& filter) {
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
const std::string filter =
withFilter ? makeJoinFilter(probeKeys, buildKeys) : "";
return JoinFuzzer::PlanWithSplits{
PlanBuilder(planNodeIdGenerator)
.values(probeInput)
Expand All @@ -863,7 +876,8 @@ void JoinFuzzer::makeAlternativePlans(
const core::PlanNodePtr& plan,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
std::vector<JoinFuzzer::PlanWithSplits>& plans) {
std::vector<JoinFuzzer::PlanWithSplits>& plans,
const std::string& filter) {
auto joinNode = std::dynamic_pointer_cast<const core::HashJoinNode>(plan);
VELOX_CHECK_NOT_NULL(joinNode);

Expand All @@ -888,7 +902,7 @@ void JoinFuzzer::makeAlternativePlans(
.localPartitionRoundRobin(
makeSources(buildInput, planNodeIdGenerator))
.planNode(),
/*filter=*/"",
filter,
outputColumns,
joinType,
joinNode->isNullAware())
Expand All @@ -897,16 +911,32 @@ void JoinFuzzer::makeAlternativePlans(
// Use OrderBy + MergeJoin
if (core::MergeJoinNode::isSupported(joinNode->joinType())) {
auto planWithSplits = makeMergeJoinPlan(
joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns);
joinType,
probeKeys,
buildKeys,
probeInput,
buildInput,
outputColumns,
filter);
plans.push_back(planWithSplits);

addFlippedJoinPlan<core::MergeJoinNode>(planWithSplits.plan, plans);
}

// Use NestedLoopJoin.
if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) {
std::string joinCondition = filter.empty()
? makeJoinFilter(probeKeys, buildKeys)
: fmt::format(
"{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter);
auto planWithSplits = makeNestedLoopJoinPlan(
joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns);
joinType,
probeKeys,
buildKeys,
probeInput,
buildInput,
outputColumns,
joinCondition);
plans.push_back(planWithSplits);

addFlippedJoinPlan<core::NestedLoopJoinNode>(planWithSplits.plan, plans);
Expand Down Expand Up @@ -957,7 +987,7 @@ RowVectorPtr JoinFuzzer::testCrossProduct(
probeInput,
buildInput,
outputColumns,
/*withFilter*/ false);
/*filter=*/"");
const auto expected = execute(plan, /*injectSpill=*/false);

// If OOM injection is not enabled verify the results against Reference query
Expand Down Expand Up @@ -992,7 +1022,7 @@ RowVectorPtr JoinFuzzer::testCrossProduct(
probeScanSplits,
buildScanSplits,
outputColumns,
/*withFilter*/ false));
/*filter=*/""));
}
addFlippedJoinPlan<core::NestedLoopJoinNode>(plan.plan, altPlans);

Expand All @@ -1011,10 +1041,27 @@ void JoinFuzzer::verify(core::JoinType joinType) {
const bool nullAware =
isNullAwareSupported(joinType) && vectorFuzzer_.coinToss(0.5);

const auto numKeys = nullAware ? 1 : randInt(1, 5);
// Add boolean/integer join filter 10% of the time.
const bool withFilter = vectorFuzzer_.coinToss(0.1);
// Null-aware joins allow only one join key.
const int numKeys = nullAware ? (withFilter ? 0 : 1) : randInt(1, 5);
std::vector<TypePtr> keyTypes = generateJoinKeyTypes(numKeys);
std::string filter;

if (withFilter) {
if (vectorFuzzer_.coinToss(0.5)) {
keyTypes.push_back(BOOLEAN());
filter = vectorFuzzer_.coinToss(0.5)
? fmt::format("t{} = true", keyTypes.size() - 1)
: fmt::format("u{} = true", keyTypes.size() - 1);
} else {
keyTypes.push_back(INTEGER());
filter = vectorFuzzer_.coinToss(0.5)
? fmt::format("t{} % {} = 0", keyTypes.size() - 1, randInt(1, 9))
: fmt::format("u{} % {} = 0", keyTypes.size() - 1, randInt(1, 9));
}
}

// Pick number and types of join keys.
const std::vector<TypePtr> keyTypes = generateJoinKeyTypes(numKeys);
std::vector<std::string> probeKeys = makeNames("t", keyTypes.size());
std::vector<std::string> buildKeys = makeNames("u", keyTypes.size());

Expand Down Expand Up @@ -1094,7 +1141,8 @@ void JoinFuzzer::verify(core::JoinType joinType) {
buildKeys,
probeInput,
buildInput,
outputColumns);
outputColumns,
filter);

const auto expected = execute(defaultPlan, /*injectSpill=*/false);

Expand All @@ -1110,7 +1158,7 @@ void JoinFuzzer::verify(core::JoinType joinType) {
{expected}),
"Velox and Reference results don't match");

LOG(INFO) << "Result matches with referenc DB.";
LOG(INFO) << "Result matches with reference DB.";
stats_.numVerified++;
}
}
Expand All @@ -1123,11 +1171,13 @@ void JoinFuzzer::verify(core::JoinType joinType) {
buildKeys,
flatProbeInput,
flatBuildInput,
outputColumns));
outputColumns,
filter));

makeAlternativePlans(defaultPlan.plan, probeInput, buildInput, altPlans);
makeAlternativePlans(
defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans);
defaultPlan.plan, probeInput, buildInput, altPlans, filter);
makeAlternativePlans(
defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans, filter);

addPlansWithTableScan(
tableScanDir->getPath(),
Expand All @@ -1138,7 +1188,8 @@ void JoinFuzzer::verify(core::JoinType joinType) {
flatProbeInput,
flatBuildInput,
outputColumns,
altPlans);
altPlans,
filter);

for (auto i = 0; i < altPlans.size(); ++i) {
LOG(INFO) << "Testing plan #" << i;
Expand Down Expand Up @@ -1190,7 +1241,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan(
const std::vector<std::string>& buildKeys,
const std::vector<Split>& probeSplits,
const std::vector<Split>& buildSplits,
const std::vector<std::string>& outputColumns) {
const std::vector<std::string>& outputColumns,
const std::string& filter) {
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId probeScanId;
core::PlanNodeId buildScanId;
Expand All @@ -1208,7 +1260,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan(
.capturePlanNodeId(buildScanId)
.orderBy(buildKeys, false)
.planNode(),
/*filter=*/"",
filter,
outputColumns,
joinType)
.planNode(),
Expand All @@ -1226,13 +1278,11 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan(
const std::vector<Split>& probeSplits,
const std::vector<Split>& buildSplits,
const std::vector<std::string>& outputColumns,
bool withFilter) {
const std::string& filter) {
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId probeScanId;
core::PlanNodeId buildScanId;

const std::string filter =
withFilter ? makeJoinFilter(probeKeys, buildKeys) : "";
return JoinFuzzer::PlanWithSplits{
PlanBuilder(planNodeIdGenerator)
.tableScan(probeType)
Expand Down Expand Up @@ -1260,7 +1310,8 @@ void JoinFuzzer::addPlansWithTableScan(
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const std::vector<std::string>& outputColumns,
std::vector<PlanWithSplits>& altPlans) {
std::vector<PlanWithSplits>& altPlans,
const std::string& filter) {
VELOX_CHECK(!tableDir.empty());

if (!isTableScanSupported(probeInput[0]->type()) ||
Expand All @@ -1286,7 +1337,8 @@ void JoinFuzzer::addPlansWithTableScan(
buildKeys,
probeScanSplits,
buildScanSplits,
outputColumns);
outputColumns,
filter);
plansWithTableScan.push_back(defaultPlan);

auto joinNode =
Expand Down Expand Up @@ -1336,7 +1388,8 @@ void JoinFuzzer::addPlansWithTableScan(
buildKeys,
probeScanSplits,
buildScanSplits,
outputColumns);
outputColumns,
filter);
altPlans.push_back(planWithSplits);

addFlippedJoinPlan<core::MergeJoinNode>(
Expand All @@ -1350,6 +1403,10 @@ void JoinFuzzer::addPlansWithTableScan(

// Add ungrouped NestedLoopJoin with TableScan.
if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) {
std::string joinCondition = filter.empty()
? makeJoinFilter(probeKeys, buildKeys)
: fmt::format(
"{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter);
auto planWithSplits = makeNestedLoopJoinPlanWithTableScan(
joinType,
probeType,
Expand All @@ -1358,7 +1415,8 @@ void JoinFuzzer::addPlansWithTableScan(
buildKeys,
probeScanSplits,
buildScanSplits,
outputColumns);
outputColumns,
joinCondition);
altPlans.push_back(planWithSplits);

addFlippedJoinPlan<core::NestedLoopJoinNode>(
Expand Down

0 comments on commit 3ea4a51

Please sign in to comment.