Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Sep 13, 2023
1 parent 4c57504 commit bc40808
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case UNNEST:
case ALLOW_TOP_LEVEL_UNION_ALL:
return false;
case CAN_SELECT:
case CAN_INSERT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
Expand Down Expand Up @@ -356,4 +357,19 @@ public void testTooManyInputFiles() throws IOException
.setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 2))
.verifyResults();
}

@Test
public void testUnionAllIsDisallowed()
{
final RowSignature rowSignature =
RowSignature.builder().add("__time", ColumnType.LONG).build();
testIngestQuery()
.setSql("SELECT * FROM foo\n"
+ "UNION ALL\n"
+ "SELECT * FROM foo\n")
.setExpectedRowSignature(rowSignature)
.setExpectedDataSource("foo1")
.setExpectedMSQFault(QueryNotSupportedFault.instance())
.verifyResults();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1386,9 +1386,11 @@ public Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>>

public void verifyResults()
{
Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null");
Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null");
Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec cannot be null ");
if (expectedMSQFault == null) {
Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null");
Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null");
Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec cannot be null ");
}
Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>> specAndResults = runQueryWithResult();

if (specAndResults == null) { // A fault was expected and the assertion has been done in the runQueryWithResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static List<RelOptRule> rules(PlannerContext plannerContext)
DruidOuterQueryRule.SORT,
new DruidUnionRule(plannerContext),
new DruidUnionDataSourceRule(plannerContext),
DruidSortUnionRule.instance(),
new DruidSortUnionRule(plannerContext),
DruidJoinRule.instance(plannerContext)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexLiteral;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidUnionRel;
import org.apache.druid.sql.calcite.run.EngineFeature;

import java.util.Collections;

Expand All @@ -32,21 +34,27 @@
*/
public class DruidSortUnionRule extends RelOptRule
{
private static final DruidSortUnionRule INSTANCE = new DruidSortUnionRule();
private final PlannerContext plannerContext;

private DruidSortUnionRule()
public DruidSortUnionRule(PlannerContext plannerContext)
{
super(operand(Sort.class, operand(DruidUnionRel.class, any())));
}

public static DruidSortUnionRule instance()
{
return INSTANCE;
this.plannerContext = plannerContext;
}

@Override
public boolean matches(final RelOptRuleCall call)
{
// Defensive check. If the planner disallows top level union all, then the DruidUnionRule would have prevented
// creating the DruidUnionRel in the first place
if (!plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) {
plannerContext.setPlanningError(
"Top level 'UNION ALL' is unsupported by SQL engine [%s].",
plannerContext.getEngine().name()
);
return false;
}

// LIMIT, no ORDER BY
final Sort sort = call.rel(0);
return sort.collation.getFieldCollations().isEmpty() && sort.fetch != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ public void onMatch(final RelOptRuleCall call)

// Can only do UNION ALL of inputs that have compatible schemas (or schema mappings) and right side
// is a simple table scan
public static boolean isCompatible(final Union unionRel, final DruidRel<?> first, final DruidRel<?> second, @Nullable PlannerContext plannerContext)
public static boolean isCompatible(
final Union unionRel,
final DruidRel<?> first,
final DruidRel<?> second,
@Nullable PlannerContext plannerContext
)
{
if (!(second instanceof DruidQueryRel)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Union;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidUnionRel;
import org.apache.druid.sql.calcite.run.EngineFeature;

import java.util.List;

Expand All @@ -51,6 +53,13 @@ public DruidUnionRule(PlannerContext plannerContext)
@Override
public boolean matches(RelOptRuleCall call)
{
if (!plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) {
plannerContext.setPlanningError(
"Top level 'UNION ALL' is unsupported by SQL engine [%s].",
plannerContext.getEngine().name()
);
return false;
}
// Make DruidUnionRule and DruidUnionDataSourceRule mutually exclusive.
final Union unionRel = call.rel(0);
final DruidRel<?> firstDruidRel = call.rel(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,21 @@ public enum EngineFeature
* that it actually *does* generate correct results in native when the join is processed on the Broker. It is much
* less likely that MSQ will plan in such a way that generates correct results.
*/
ALLOW_BROADCAST_RIGHTY_JOIN;
ALLOW_BROADCAST_RIGHTY_JOIN,

/**
* Planner is permitted to use {@link org.apache.druid.sql.calcite.rel.DruidUnionRel} to plan the top level UNION ALL.
* This is to dissuade planner from accepting and running the UNION ALL queries that are not supported by engines
* (primarily MSQ).
*
* Due to the nature of the exeuction of the top level UNION ALLs (we run the individual queries and concat the
* results), it only makes sense to enable this on engines where the queries return the results synchronously
*
* Planning queries with top level UNION_ALL leads to undesirable behaviour with asynchronous engines like MSQ.
* To enumerate this behaviour for MSQ, the broker attempts to run the individual queries as MSQ queries in succession,
* submits the first query correctly, fails on the rest of the queries (due to conflicting taskIds),
* and cannot concat the results together (as * the result for broker is the query id). Therefore, we don't get the
* correct result back, while the MSQ engine is executing the partial query
*/
ALLOW_TOP_LEVEL_UNION_ALL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon
case WINDOW_FUNCTIONS:
case UNNEST:
case ALLOW_BROADCAST_RIGHTY_JOIN:
case ALLOW_TOP_LEVEL_UNION_ALL:
return true;
case TIME_BOUNDARY_QUERY:
return plannerContext.queryContext().isTimeBoundaryPlanningEnabled();
Expand Down

0 comments on commit bc40808

Please sign in to comment.