From 7e60702624c353bbf6a56212a8f669cad6b69280 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 27 Sep 2023 10:56:33 +0530 Subject: [PATCH] changes --- .../druid/msq/querykit/DataSourcePlan.java | 61 ++ .../msq/test/CalciteSelectQueryMSQTest.java | 12 - .../msq/test/CalciteUnionQueryMSQTest.java | 112 ++++ .../apache/druid/query/UnionDataSource.java | 44 +- .../apache/druid/query/UnionQueryRunner.java | 10 +- .../apache/druid/query/DataSourceTest.java | 2 +- .../druid/query/UnionDataSourceTest.java | 2 +- .../calcite/rel/DruidUnionDataSourceRel.java | 2 +- .../druid/sql/calcite/CalciteQueryTest.java | 525 ----------------- .../sql/calcite/CalciteUnionQueryTest.java | 546 ++++++++++++++++++ 10 files changed, 757 insertions(+), 559 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java create mode 100644 sql/src/test/java/org/apache/druid/sql/calcite/CalciteUnionQueryTest.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index d8481bf7a094..16eaef63c497 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -51,6 +51,7 @@ import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.planning.DataSourceAnalysis; @@ -170,6 +171,18 @@ public static DataSourcePlan forDataSource( minStageNumber, broadcast ); + } else if (dataSource instanceof UnionDataSource) { + return forUnion( + queryKit, + queryId, + queryContext, + (UnionDataSource) dataSource, + querySegmentSpec, + filter, + maxWorkerCount, + minStageNumber, + broadcast + ); } else if (dataSource instanceof JoinDataSource) { final JoinAlgorithm preferredJoinAlgorithm = PlannerContext.getJoinAlgorithm(queryContext); final JoinAlgorithm deducedJoinAlgorithm = deduceJoinAlgorithm( @@ -458,6 +471,54 @@ private static DataSourcePlan forUnnest( ); } + private static DataSourcePlan forUnion( + final QueryKit queryKit, + final String queryId, + final QueryContext queryContext, + final UnionDataSource unionDataSource, + final QuerySegmentSpec querySegmentSpec, + @Nullable DimFilter filter, + final int maxWorkerCount, + final int minStageNumber, + final boolean broadcast + ) + { + // This is done to prevent loss of generality since MSQ can plan any type of DataSource. + List children = unionDataSource.getDataSources(); + + final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(); + final List newChildren = new ArrayList<>(); + final List inputSpecs = new ArrayList<>(); + final IntSet broadcastInputs = new IntOpenHashSet(); + + for (DataSource child : children) { + DataSourcePlan childDataSourcePlan = forDataSource( + queryKit, + queryId, + queryContext, + child, + querySegmentSpec, + filter, + maxWorkerCount, + Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()), + broadcast + ); + + int shift = inputSpecs.size(); + + newChildren.add(shiftInputNumbers(childDataSourcePlan.getNewDataSource(), shift)); + inputSpecs.addAll(childDataSourcePlan.getInputSpecs()); + childDataSourcePlan.getSubQueryDefBuilder().ifPresent(subqueryDefBuilder::addAll); + childDataSourcePlan.getBroadcastInputs().forEach(inp -> broadcastInputs.add(inp + shift)); + } + return new DataSourcePlan( + new UnionDataSource(newChildren), + inputSpecs, + broadcastInputs, + subqueryDefBuilder + ); + } + /** * Build a plan for broadcast hash-join. */ diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java index 7ca1137d459e..f9f9986a0317 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java @@ -175,16 +175,4 @@ public void testArrayAggQueryOnComplexDatatypes() } } - /** - * Doesn't pass through Druid however the planning error is different as it rewrites to a union datasource. - * This test is disabled because MSQ wants to support union datasources, and it makes little sense to add highly - * conditional planning error for the same. Planning errors are merely hints, and this is one of those times - * when the hint is incorrect till MSQ starts supporting the union datasource. - */ - @Test - @Override - public void testUnionIsUnplannable() - { - - } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java new file mode 100644 index 000000000000..9f688660e169 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Injector; +import com.google.inject.Module; +import org.apache.druid.guice.DruidInjectorBuilder; +import org.apache.druid.msq.exec.WorkerMemoryParameters; +import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.CalciteUnionQueryTest; +import org.apache.druid.sql.calcite.QueryTestBuilder; +import org.apache.druid.sql.calcite.run.SqlEngine; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Runs {@link CalciteUnionQueryTest} but with MSQ engine + */ +public class CalciteUnionQueryMSQTest extends CalciteUnionQueryTest +{ + private TestGroupByBuffers groupByBuffers; + + @Before + public void setup2() + { + groupByBuffers = TestGroupByBuffers.createDefault(); + } + + @After + public void teardown2() + { + groupByBuffers.close(); + } + + @Override + public void configureGuice(DruidInjectorBuilder builder) + { + super.configureGuice(builder); + builder.addModules(CalciteMSQTestsHelper.fetchModules(temporaryFolder, groupByBuffers).toArray(new Module[0])); + } + + + @Override + public SqlEngine createEngine( + QueryLifecycleFactory qlf, + ObjectMapper queryJsonMapper, + Injector injector + ) + { + final WorkerMemoryParameters workerMemoryParameters = + WorkerMemoryParameters.createInstance( + WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50, + 2, + 10, + 2, + 0, + 0 + ); + final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient( + queryJsonMapper, + injector, + new MSQTestTaskActionClient(queryJsonMapper), + workerMemoryParameters + ); + return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper); + } + + @Override + protected QueryTestBuilder testBuilder() + { + return new QueryTestBuilder(new BaseCalciteQueryTest.CalciteTestConfig(true)) + .addCustomRunner(new ExtractResultsFactory(() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient())) + .skipVectorize(true) + .verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate()) + .msqCompatible(msqCompatible); + } + + /** + * Doesn't pass through Druid however the planning error is different as it rewrites to a union datasource. + * This test is disabled because MSQ wants to support union datasources, and it makes little sense to add highly + * conditional planning error for the same. Planning errors are merely hints, and this is one of those times + * when the hint is incorrect till MSQ starts supporting the union datasource. + */ + @Test + @Override + public void testUnionIsUnplannable() + { + + } +} diff --git a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java index 3f538f5ad5aa..37e37c64ec99 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.planning.DataSourceAnalysis; @@ -36,13 +37,16 @@ import java.util.function.Function; import java.util.stream.Collectors; +/** + * TODO(laksh): + */ public class UnionDataSource implements DataSource { @JsonProperty - private final List dataSources; + private final List dataSources; @JsonCreator - public UnionDataSource(@JsonProperty("dataSources") List dataSources) + public UnionDataSource(@JsonProperty("dataSources") List dataSources) { if (dataSources == null || dataSources.isEmpty()) { throw new ISE("'dataSources' must be non-null and non-empty for 'union'"); @@ -51,18 +55,38 @@ public UnionDataSource(@JsonProperty("dataSources") List dataSo this.dataSources = dataSources; } + public List getDataSources() + { + return dataSources; + } + + + // TODO: native only method @Override public Set getTableNames() { return dataSources.stream() - .map(input -> Iterables.getOnlyElement(input.getTableNames())) + .map(input -> { + if (!(input instanceof TableDataSource)) { + throw DruidException.defensive("should be table"); + } + return Iterables.getOnlyElement(input.getTableNames()); + }) .collect(Collectors.toSet()); } + // TODO: native only method @JsonProperty - public List getDataSources() + public List getDataSourcesAsTableDataSources() { - return dataSources; + return dataSources.stream() + .map(input -> { + if (!(input instanceof TableDataSource)) { + throw DruidException.defensive("should be table"); + } + return (TableDataSource) input; + }) + .collect(Collectors.toList()); } @Override @@ -78,10 +102,6 @@ public DataSource withChildren(List children) throw new IAE("Expected [%d] children, got [%d]", dataSources.size(), children.size()); } - if (!children.stream().allMatch(dataSource -> dataSource instanceof TableDataSource)) { - throw new IAE("All children must be tables"); - } - return new UnionDataSource( children.stream().map(dataSource -> (TableDataSource) dataSource).collect(Collectors.toList()) ); @@ -149,11 +169,7 @@ public boolean equals(Object o) UnionDataSource that = (UnionDataSource) o; - if (!dataSources.equals(that.dataSources)) { - return false; - } - - return true; + return dataSources.equals(that.dataSources); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java index aeb3897e644b..5459e1d8c22e 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java @@ -57,16 +57,16 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext respo final UnionDataSource unionDataSource = analysis.getBaseUnionDataSource().get(); - if (unionDataSource.getDataSources().isEmpty()) { + if (unionDataSource.getDataSourcesAsTableDataSources().isEmpty()) { // Shouldn't happen, because UnionDataSource doesn't allow empty unions. throw new ISE("Unexpectedly received empty union"); - } else if (unionDataSource.getDataSources().size() == 1) { + } else if (unionDataSource.getDataSourcesAsTableDataSources().size() == 1) { // Single table. Run as a normal query. return baseRunner.run( queryPlus.withQuery( Queries.withBaseDataSource( query, - Iterables.getOnlyElement(unionDataSource.getDataSources()) + Iterables.getOnlyElement(unionDataSource.getDataSourcesAsTableDataSources()) ) ), responseContext @@ -77,8 +77,8 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext respo query.getResultOrdering(), Sequences.simple( Lists.transform( - IntStream.range(0, unionDataSource.getDataSources().size()) - .mapToObj(i -> new Pair<>(unionDataSource.getDataSources().get(i), i + 1)) + IntStream.range(0, unionDataSource.getDataSourcesAsTableDataSources().size()) + .mapToObj(i -> new Pair<>(unionDataSource.getDataSourcesAsTableDataSources().get(i), i + 1)) .collect(Collectors.toList()), (Function, Sequence>) singleSourceWithIndex -> baseRunner.run( diff --git a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java index 7c7f50f281bb..e7850953a609 100644 --- a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java @@ -89,7 +89,7 @@ public void testUnionDataSource() throws Exception Assert.assertTrue(dataSource instanceof UnionDataSource); Assert.assertEquals( Lists.newArrayList(new TableDataSource("ds1"), new TableDataSource("ds2")), - Lists.newArrayList(((UnionDataSource) dataSource).getDataSources()) + Lists.newArrayList(((UnionDataSource) dataSource).getDataSourcesAsTableDataSources()) ); Assert.assertEquals( ImmutableSet.of("ds1", "ds2"), diff --git a/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java index f408e71abf23..12522df08df3 100644 --- a/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java @@ -123,7 +123,7 @@ public void test_withChildren_empty() @Test public void test_withChildren_sameNumber() { - final List newDataSources = ImmutableList.of( + final List newDataSources = ImmutableList.of( new TableDataSource("baz"), new TableDataSource("qux") ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java index 5e213de711cc..dbbcfa0f9a3b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java @@ -118,7 +118,7 @@ public DruidUnionDataSourceRel withPartialQuery(final PartialDruidQuery newQuery @Override public DruidQuery toDruidQuery(final boolean finalizeAggregations) { - final List dataSources = new ArrayList<>(); + final List dataSources = new ArrayList<>(); RowSignature signature = null; for (final RelNode relNode : unionRel.getInputs()) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 2d6dc078b9a6..9ec5a268e1c3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -42,7 +42,6 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; @@ -2874,476 +2873,6 @@ public void testUnionAllQueriesWithLimit() ); } - @DecoupledIgnore - @Test - public void testUnionAllDifferentTablesWithMapping() - { - notMsqCompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE3) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 2.0, 2L}, - new Object[]{"1", "a", 8.0, 2L} - ) - ); - } - - @DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES) - @Test - public void testJoinUnionAllDifferentTablesWithMapping() - { - notMsqCompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE3) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 2.0, 2L}, - new Object[]{"1", "a", 8.0, 2L} - ) - ); - } - - @Test - public void testUnionAllTablesColumnCountMismatch() - { - try { - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM numfoo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of(), - ImmutableList.of() - ); - Assert.fail("query execution should fail"); - } - catch (DruidException e) { - MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [42])")); - } - } - - @DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES) - @Test - public void testUnionAllTablesColumnTypeMismatchFloatLong() - { - notMsqCompatible(); - // "m1" has a different type in foo and foo2 (float vs long), but this query is OK anyway because they can both - // be implicitly cast to double. - - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo2 UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'en'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE2), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("en", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 1.0, 1L}, - new Object[]{"1", "a", 4.0, 1L}, - new Object[]{"druid", "en", 1.0, 1L} - ) - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionAllTablesColumnTypeMismatchStringLong() - { - // "dim3" has a different type in foo and foo2 (string vs long), which requires a casting subquery, so this - // query cannot be planned. - - assertQueryIsUnplannable( - "SELECT\n" - + "dim3, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim3, dim2, m1 FROM foo2 UNION ALL SELECT dim3, dim2, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'en'\n" - + "GROUP BY 1, 2", - "SQL requires union between inputs that are not simple table scans and involve a " + - "filter or aliasing. Or column types of tables being unioned are not of same type." - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionAllTablesWhenMappingIsRequired() - { - // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. - - assertQueryIsUnplannable( - "SELECT\n" - + "c, COUNT(*)\n" - + "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT dim2 AS c, m1 FROM numfoo)\n" - + "WHERE c = 'a' OR c = 'def'\n" - + "GROUP BY 1", - "SQL requires union between two tables " + - "and column names queried for each table are different Left: [dim1], Right: [dim2]." - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionIsUnplannable() - { - // Cannot plan this UNION operation - assertQueryIsUnplannable( - "SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo", - "SQL requires 'UNION' but only 'UNION ALL' is supported." - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionAllTablesWhenCastAndMappingIsRequired() - { - // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. - assertQueryIsUnplannable( - "SELECT\n" - + "c, COUNT(*)\n" - + "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT cnt AS c, m1 FROM numfoo)\n" - + "WHERE c = 'a' OR c = 'def'\n" - + "GROUP BY 1", - "SQL requires union between inputs that are not simple table scans and involve " + - "a filter or aliasing. Or column types of tables being unioned are not of same type." - ); - } - - @DecoupledIgnore - @Test - public void testUnionAllSameTableTwice() - { - notMsqCompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 2.0, 2L}, - new Object[]{"1", "a", 8.0, 2L} - ) - ); - } - - @DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES) - @Test - public void testUnionAllSameTableTwiceWithSameMapping() - { - notMsqCompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 2.0, 2L}, - new Object[]{"1", "a", 8.0, 2L} - ) - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionAllSameTableTwiceWithDifferentMapping() - { - // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. - assertQueryIsUnplannable( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim2, dim1, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - "SQL requires union between two tables and column names queried for each table are different Left: [dim1, dim2, m1], Right: [dim2, dim1, m1]." - ); - } - @DecoupledIgnore - @Test - public void testUnionAllSameTableThreeTimes() - { - notMsqCompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 3.0, 3L}, - new Object[]{"1", "a", 12.0, 3L} - ) - ); - } - - @Test - public void testUnionAllThreeTablesColumnCountMismatch1() - { - try { - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of(), - ImmutableList.of() - ); - Assert.fail("query execution should fail"); - } - catch (DruidException e) { - MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])")); - } - } - - @Test - public void testUnionAllThreeTablesColumnCountMismatch2() - { - try { - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of(), - ImmutableList.of() - ); - Assert.fail("query execution should fail"); - } - catch (DruidException e) { - MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])")); - } - } - - @Test - public void testUnionAllThreeTablesColumnCountMismatch3() - { - try { - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * from numfoo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of(), - ImmutableList.of() - ); - Assert.fail("query execution should fail"); - } - catch (DruidException e) { - MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [70])")); - } - } - - @DecoupledIgnore - @Test - public void testUnionAllSameTableThreeTimesWithSameMapping() - { - notMsqCompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 3.0, 3L}, - new Object[]{"1", "a", 12.0, 3L} - ) - ); - } - @Test public void testPruneDeadAggregators() { @@ -7330,60 +6859,6 @@ public void testExactCountDistinctUsingSubquery() ); } - @DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES) - @Test - public void testExactCountDistinctUsingSubqueryOnUnionAllTables() - { - notMsqCompatible(); - testQuery( - "SELECT\n" - + " SUM(cnt),\n" - + " COUNT(*)\n" - + "FROM (\n" - + " SELECT dim2, SUM(cnt) AS cnt\n" - + " FROM (SELECT * FROM druid.foo UNION ALL SELECT * FROM druid.foo)\n" - + " GROUP BY dim2\n" - + ")", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new QueryDataSource( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) - .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setAggregatorSpecs(aggregators( - new LongSumAggregatorFactory("_a0", "a0"), - new CountAggregatorFactory("_a1") - )) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - NullHandling.replaceWithDefault() ? - ImmutableList.of( - new Object[]{12L, 3L} - ) : - ImmutableList.of( - new Object[]{12L, 4L} - ) - ); - } - @Test public void testAvgDailyCountDistinct() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteUnionQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteUnionQueryTest.java new file mode 100644 index 000000000000..1e1ec3d6986c --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteUnionQueryTest.java @@ -0,0 +1,546 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; +import org.junit.Test; + +public class CalciteUnionQueryTest extends BaseCalciteQueryTest +{ + @Test + public void testUnionAllDifferentTablesWithMapping() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE3) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 2.0, 2L}, + new Object[]{"1", "a", 8.0, 2L} + ) + ); + } + + @Test + public void testJoinUnionAllDifferentTablesWithMapping() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE3) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 2.0, 2L}, + new Object[]{"1", "a", 8.0, 2L} + ) + ); + } + + @Test + public void testUnionAllTablesColumnCountMismatch() + { + try { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM numfoo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of(), + ImmutableList.of() + ); + Assert.fail("query execution should fail"); + } + catch (DruidException e) { + MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [42])")); + } + } + + @Test + public void testUnionAllTablesColumnTypeMismatchFloatLong() + { + // "m1" has a different type in foo and foo2 (float vs long), but this query is OK anyway because they can both + // be implicitly cast to double. + + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo2 UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'en'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE2), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("en", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 1.0, 1L}, + new Object[]{"1", "a", 4.0, 1L}, + new Object[]{"druid", "en", 1.0, 1L} + ) + ); + } + + @Test + public void testUnionAllTablesColumnTypeMismatchStringLong() + { + // "dim3" has a different type in foo and foo2 (string vs long), which requires a casting subquery, so this + // query cannot be planned. + + assertQueryIsUnplannable( + "SELECT\n" + + "dim3, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim3, dim2, m1 FROM foo2 UNION ALL SELECT dim3, dim2, m1 FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'en'\n" + + "GROUP BY 1, 2", + "SQL requires union between inputs that are not simple table scans and involve a " + + "filter or aliasing. Or column types of tables being unioned are not of same type." + ); + } + + @Test + public void testUnionAllTablesWhenMappingIsRequired() + { + // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. + + assertQueryIsUnplannable( + "SELECT\n" + + "c, COUNT(*)\n" + + "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT dim2 AS c, m1 FROM numfoo)\n" + + "WHERE c = 'a' OR c = 'def'\n" + + "GROUP BY 1", + "SQL requires union between two tables " + + "and column names queried for each table are different Left: [dim1], Right: [dim2]." + ); + } + + @Test + public void testUnionIsUnplannable() + { + // Cannot plan this UNION operation + assertQueryIsUnplannable( + "SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo", + "SQL requires 'UNION' but only 'UNION ALL' is supported." + ); + } + + @Test + public void testUnionAllTablesWhenCastAndMappingIsRequired() + { + // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. + assertQueryIsUnplannable( + "SELECT\n" + + "c, COUNT(*)\n" + + "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT cnt AS c, m1 FROM numfoo)\n" + + "WHERE c = 'a' OR c = 'def'\n" + + "GROUP BY 1", + "SQL requires union between inputs that are not simple table scans and involve " + + "a filter or aliasing. Or column types of tables being unioned are not of same type." + ); + } + + @Test + public void testUnionAllSameTableTwice() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 2.0, 2L}, + new Object[]{"1", "a", 8.0, 2L} + ) + ); + } + + @Test + public void testUnionAllSameTableTwiceWithSameMapping() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 2.0, 2L}, + new Object[]{"1", "a", 8.0, 2L} + ) + ); + } + + @Test + public void testUnionAllSameTableTwiceWithDifferentMapping() + { + // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. + assertQueryIsUnplannable( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim2, dim1, m1 FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + "SQL requires union between two tables and column names queried for each table are different Left: [dim1, dim2, m1], Right: [dim2, dim1, m1]." + ); + } + + @Test + public void testUnionAllSameTableThreeTimes() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 3.0, 3L}, + new Object[]{"1", "a", 12.0, 3L} + ) + ); + } + + @Test + public void testUnionAllThreeTablesColumnCountMismatch1() + { + try { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of(), + ImmutableList.of() + ); + Assert.fail("query execution should fail"); + } + catch (DruidException e) { + MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])")); + } + } + + @Test + public void testUnionAllThreeTablesColumnCountMismatch2() + { + try { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of(), + ImmutableList.of() + ); + Assert.fail("query execution should fail"); + } + catch (DruidException e) { + MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])")); + } + } + + @Test + public void testUnionAllThreeTablesColumnCountMismatch3() + { + try { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * from numfoo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of(), + ImmutableList.of() + ); + Assert.fail("query execution should fail"); + } + catch (DruidException e) { + MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [70])")); + } + } + + @Test + public void testUnionAllSameTableThreeTimesWithSameMapping() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 3.0, 3L}, + new Object[]{"1", "a", 12.0, 3L} + ) + ); + } + + @Test + public void testExactCountDistinctUsingSubqueryOnUnionAllTables() + { + testQuery( + "SELECT\n" + + " SUM(cnt),\n" + + " COUNT(*)\n" + + "FROM (\n" + + " SELECT dim2, SUM(cnt) AS cnt\n" + + " FROM (SELECT * FROM druid.foo UNION ALL SELECT * FROM druid.foo)\n" + + " GROUP BY dim2\n" + + ")", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + new LongSumAggregatorFactory("_a0", "a0"), + new CountAggregatorFactory("_a1") + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.replaceWithDefault() ? + ImmutableList.of( + new Object[]{12L, 3L} + ) : + ImmutableList.of( + new Object[]{12L, 4L} + ) + ); + } + +}