Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Oct 5, 2023
1 parent 0ee8d50 commit 92f35c0
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
Expand Down Expand Up @@ -361,32 +360,9 @@ public void testTooManyInputFiles() throws IOException
}

@Test
public void testUnionAllUsingUnionDataSourceDisallowed()
public void testUnionAllWithDifferentColumnNames()
{
final RowSignature rowSignature =
RowSignature.builder().add("__time", ColumnType.LONG).build();
// This plans the query using DruidUnionDataSourceRule since the DruidUnionDataSourceRule#isCompatible
// returns true (column names, types match, and it is a union on the table data sources).
// It gets planned correctly, however MSQ engine cannot plan the query correctly
testSelectQuery()
.setSql("SELECT * FROM foo\n"
+ "UNION ALL\n"
+ "SELECT * FROM foo\n")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(QueryNotSupportedFault.instance())
.verifyResults();
}

@Test
public void testUnionAllUsingTopLevelUnionDisallowedWhilePlanning()
{
// This results in a planning error however the planning error isn't an accurate representation of the actual error.
// Calcite tries to plan the query using DruidUnionRule, which passes with native, however fails with MSQ (due to engine
// feature ALLOW_TOP_LEVEL_UNION_ALL being absent)
// Calcite then tries to write it using DruidUnionDataSourceRule. However, the isCompatible check fails because column
// names mismatch. But it sets the planning error with this mismatch, which can be misleading since native queries can
// plan fine using the DruidUnionRule (that's disabled in MSQ)
// Once MSQ is able to support union datasources, we'd be able to execute this query fine in MSQ
// This test fails till MSQ can support arbitrary column names and column types for UNION ALL
testIngestQuery()
.setSql(
"INSERT INTO druid.dst "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
Expand Down Expand Up @@ -1929,8 +1930,8 @@ public void testGroupByOnFooWithDurableStoragePathAssertions() throws IOExceptio
new ColumnMappings(ImmutableList.of(
new ColumnMapping("d0", "cnt"),
new ColumnMapping("a0", "cnt1")
)
))
)
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
Expand Down Expand Up @@ -2322,6 +2323,64 @@ public void testSelectUnnestOnQueryFoo()
.verifyResults();
}

@Test
public void testUnionAllUsingUnionDataSource()
{

final RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();

final List<Object[]> results = ImmutableList.of(
new Object[]{946684800000L, ""},
new Object[]{946684800000L, ""},
new Object[]{946771200000L, "10.1"},
new Object[]{946771200000L, "10.1"},
new Object[]{946857600000L, "2"},
new Object[]{946857600000L, "2"},
new Object[]{978307200000L, "1"},
new Object[]{978307200000L, "1"},
new Object[]{978393600000L, "def"},
new Object[]{978393600000L, "def"},
new Object[]{978480000000L, "abc"},
new Object[]{978480000000L, "abc"}
);
// This plans the query using DruidUnionDataSourceRule since the DruidUnionDataSourceRule#isCompatible
// returns true (column names, types match, and it is a union on the table data sources).
// It gets planned correctly, however MSQ engine cannot plan the query correctly
testSelectQuery()
.setSql("SELECT __time, dim1 FROM foo\n"
+ "UNION ALL\n"
+ "SELECT __time, dim1 FROM foo\n")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQSpec(
MSQSpec.builder()
.query(newScanQueryBuilder()
.dataSource(new UnionDataSource(
ImmutableList.of(new TableDataSource("foo"), new TableDataSource("foo"))
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(defaultScanQueryContext(
context,
rowSignature
))
.columns(ImmutableList.of("__time", "dim1"))
.build())
.columnMappings(ColumnMappings.identity(rowSignature))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
: TaskReportMSQDestination.INSTANCE)
.build()
)
.setQueryContext(context)
.setExpectedResultRows(results)
.verifyResults();
}

@Nonnull
private List<Object[]> expectedMultiValueFooRowsGroup()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/**
Expand Down Expand Up @@ -122,6 +123,7 @@ public void testUnionIsUnplannable()

}

@Ignore("Ignored till MSQ can plan UNION ALL with any operand")
@Test
public void testUnionOnSubqueries()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,15 @@ public static List<RelOptRule> rules(PlannerContext plannerContext)
DruidOuterQueryRule.WHERE_FILTER,
DruidOuterQueryRule.SELECT_PROJECT,
DruidOuterQueryRule.SORT,
// new DruidUnionDataSourceRule(plannerContext),
new DruidFreeUnionDataSourceRule(plannerContext),
new DruidUnionDataSourceRule(plannerContext),
DruidJoinRule.instance(plannerContext)
)
);

if (plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) {
// if (plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) {
retVal.add(new DruidUnionRule(plannerContext));
retVal.add(DruidSortUnionRule.instance());
}
// }

if (plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) {
retVal.add(new DruidQueryRule<>(Window.class, PartialDruidQuery.Stage.WINDOW, PartialDruidQuery::withWindow));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidUnionRel;
import org.apache.druid.sql.calcite.run.EngineFeature;

import java.util.List;

Expand All @@ -51,6 +52,10 @@ public DruidUnionRule(PlannerContext plannerContext)
@Override
public boolean matches(RelOptRuleCall call)
{
if (plannerContext != null && !plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) {
plannerContext.setPlanningError("Queries cannot be planned using top level union all");
return false;
}
// Make DruidUnionRule and DruidUnionDataSourceRule mutually exclusive.
final Union unionRel = call.rel(0);
final DruidRel<?> firstDruidRel = call.rel(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ public void testFilterAndGroupByLookupUsingJoinOperatorWithNotFilter(Map<String,
public void testJoinUnionTablesOnLookup(Map<String, Object> queryContext)
{
// MSQ does not support UNION ALL.
notMsqCompatible();
// notMsqCompatible();
// Cannot vectorize JOIN operator.
cannotVectorize();

Expand Down Expand Up @@ -4189,7 +4189,7 @@ public void testJoinOnMultiValuedColumnShouldThrowException(Map<String, Object>
public void testUnionAllTwoQueriesLeftQueryIsJoin(Map<String, Object> queryContext)
{
// MSQ does not support UNION ALL.
notMsqCompatible();
// notMsqCompatible();

// Fully removing the join allows this query to vectorize.
if (!isRewriteJoinToFilter(queryContext)) {
Expand Down Expand Up @@ -4233,7 +4233,7 @@ public void testUnionAllTwoQueriesLeftQueryIsJoin(Map<String, Object> queryConte
public void testUnionAllTwoQueriesRightQueryIsJoin(Map<String, Object> queryContext)
{
// MSQ does not support UNION ALL.
notMsqCompatible();
// notMsqCompatible();

// Fully removing the join allows this query to vectorize.
if (!isRewriteJoinToFilter(queryContext)) {
Expand Down Expand Up @@ -4276,7 +4276,7 @@ public void testUnionAllTwoQueriesRightQueryIsJoin(Map<String, Object> queryCont
public void testUnionAllTwoQueriesBothQueriesAreJoin()
{
// MSQ does not support UNION ALL.
notMsqCompatible();
// notMsqCompatible();
cannotVectorize();

testQuery(
Expand Down

0 comments on commit 92f35c0

Please sign in to comment.