diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index ee7a590e1f0a..77d61c4072ae 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -115,6 +115,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case GROUPING_SETS: case WINDOW_FUNCTIONS: case UNNEST: + case ALLOW_TOP_LEVEL_UNION_ALL: return false; case CAN_SELECT: case CAN_INSERT: diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index b3bf3c408a29..99a0e365394f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -27,6 +27,7 @@ import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; import org.apache.druid.msq.indexing.error.InsertTimeNullFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; +import org.apache.druid.msq.indexing.error.QueryNotSupportedFault; import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault; import org.apache.druid.msq.indexing.error.TooManyColumnsFault; import org.apache.druid.msq.indexing.error.TooManyInputFilesFault; @@ -356,4 +357,19 @@ public void testTooManyInputFiles() throws IOException .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 2)) .verifyResults(); } + + @Test + public void testUnionAllIsDisallowed() + { + final RowSignature rowSignature = + RowSignature.builder().add("__time", ColumnType.LONG).build(); + testIngestQuery() + .setSql("SELECT * FROM foo\n" + + "UNION ALL\n" + + "SELECT * FROM foo\n") + .setExpectedRowSignature(rowSignature) + .setExpectedDataSource("foo1") + .setExpectedMSQFault(QueryNotSupportedFault.instance()) + .verifyResults(); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 9ebcb2ec53e3..4378ed847619 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1386,9 +1386,11 @@ public Pair, List>> public void verifyResults() { - Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null"); - Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null"); - Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec cannot be null "); + if (expectedMSQFault == null) { + Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null"); + Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null"); + Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec cannot be null "); + } Pair, List>> specAndResults = runQueryWithResult(); if (specAndResults == null) { // A fault was expected and the assertion has been done in the runQueryWithResult diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java index 526eb6a976c0..b1a6a557f761 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java @@ -97,7 +97,7 @@ public static List rules(PlannerContext plannerContext) DruidOuterQueryRule.SORT, new DruidUnionRule(plannerContext), new DruidUnionDataSourceRule(plannerContext), - DruidSortUnionRule.instance(), + new DruidSortUnionRule(plannerContext), DruidJoinRule.instance(plannerContext) ) ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidSortUnionRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidSortUnionRule.java index daf1162ac44d..f316ab9f0487 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidSortUnionRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidSortUnionRule.java @@ -23,7 +23,9 @@ import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rex.RexLiteral; +import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidUnionRel; +import org.apache.druid.sql.calcite.run.EngineFeature; import java.util.Collections; @@ -32,21 +34,27 @@ */ public class DruidSortUnionRule extends RelOptRule { - private static final DruidSortUnionRule INSTANCE = new DruidSortUnionRule(); + private final PlannerContext plannerContext; - private DruidSortUnionRule() + public DruidSortUnionRule(PlannerContext plannerContext) { super(operand(Sort.class, operand(DruidUnionRel.class, any()))); - } - - public static DruidSortUnionRule instance() - { - return INSTANCE; + this.plannerContext = plannerContext; } @Override public boolean matches(final RelOptRuleCall call) { + // Defensive check. If the planner disallows top level union all, then the DruidUnionRule would have prevented + // creating the DruidUnionRel in the first place + if (!plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) { + plannerContext.setPlanningError( + "Top level 'UNION ALL' is unsupported by SQL engine [%s].", + plannerContext.getEngine().name() + ); + return false; + } + // LIMIT, no ORDER BY final Sort sort = call.rel(0); return sort.collation.getFieldCollations().isEmpty() && sort.fetch != null; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRule.java index 99f6248b37d5..e4a72776315d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRule.java @@ -112,7 +112,12 @@ public void onMatch(final RelOptRuleCall call) // Can only do UNION ALL of inputs that have compatible schemas (or schema mappings) and right side // is a simple table scan - public static boolean isCompatible(final Union unionRel, final DruidRel first, final DruidRel second, @Nullable PlannerContext plannerContext) + public static boolean isCompatible( + final Union unionRel, + final DruidRel first, + final DruidRel second, + @Nullable PlannerContext plannerContext + ) { if (!(second instanceof DruidQueryRel)) { return false; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionRule.java index 40cb2161c155..38a5c2a5aa50 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionRule.java @@ -23,9 +23,11 @@ import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Union; +import org.apache.druid.sql.calcite.external.ExternalOperatorConversion; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidRel; import org.apache.druid.sql.calcite.rel.DruidUnionRel; +import org.apache.druid.sql.calcite.run.EngineFeature; import java.util.List; @@ -51,6 +53,13 @@ public DruidUnionRule(PlannerContext plannerContext) @Override public boolean matches(RelOptRuleCall call) { + if (!plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) { + plannerContext.setPlanningError( + "Top level 'UNION ALL' is unsupported by SQL engine [%s].", + plannerContext.getEngine().name() + ); + return false; + } // Make DruidUnionRule and DruidUnionDataSourceRule mutually exclusive. final Union unionRel = call.rel(0); final DruidRel firstDruidRel = call.rel(1); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java index 94827c2955da..778c7ec03b6f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java @@ -102,5 +102,21 @@ public enum EngineFeature * that it actually *does* generate correct results in native when the join is processed on the Broker. It is much * less likely that MSQ will plan in such a way that generates correct results. */ - ALLOW_BROADCAST_RIGHTY_JOIN; + ALLOW_BROADCAST_RIGHTY_JOIN, + + /** + * Planner is permitted to use {@link org.apache.druid.sql.calcite.rel.DruidUnionRel} to plan the top level UNION ALL. + * This is to dissuade planner from accepting and running the UNION ALL queries that are not supported by engines + * (primarily MSQ). + * + * Due to the nature of the exeuction of the top level UNION ALLs (we run the individual queries and concat the + * results), it only makes sense to enable this on engines where the queries return the results synchronously + * + * Planning queries with top level UNION_ALL leads to undesirable behaviour with asynchronous engines like MSQ. + * To enumerate this behaviour for MSQ, the broker attempts to run the individual queries as MSQ queries in succession, + * submits the first query correctly, fails on the rest of the queries (due to conflicting taskIds), + * and cannot concat the results together (as * the result for broker is the query id). Therefore, we don't get the + * correct result back, while the MSQ engine is executing the partial query + */ + ALLOW_TOP_LEVEL_UNION_ALL; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index d7fc7d043b6f..164e02a0ca8d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -105,6 +105,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case WINDOW_FUNCTIONS: case UNNEST: case ALLOW_BROADCAST_RIGHTY_JOIN: + case ALLOW_TOP_LEVEL_UNION_ALL: return true; case TIME_BOUNDARY_QUERY: return plannerContext.queryContext().isTimeBoundaryPlanningEnabled();