Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Sep 27, 2023
1 parent 9b31be6 commit 7e60702
Show file tree
Hide file tree
Showing 10 changed files with 757 additions and 559 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
Expand Down Expand Up @@ -170,6 +171,18 @@ public static DataSourcePlan forDataSource(
minStageNumber,
broadcast
);
} else if (dataSource instanceof UnionDataSource) {
return forUnion(
queryKit,
queryId,
queryContext,
(UnionDataSource) dataSource,
querySegmentSpec,
filter,
maxWorkerCount,
minStageNumber,
broadcast
);
} else if (dataSource instanceof JoinDataSource) {
final JoinAlgorithm preferredJoinAlgorithm = PlannerContext.getJoinAlgorithm(queryContext);
final JoinAlgorithm deducedJoinAlgorithm = deduceJoinAlgorithm(
Expand Down Expand Up @@ -458,6 +471,54 @@ private static DataSourcePlan forUnnest(
);
}

private static DataSourcePlan forUnion(
final QueryKit queryKit,
final String queryId,
final QueryContext queryContext,
final UnionDataSource unionDataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable DimFilter filter,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
// This is done to prevent loss of generality since MSQ can plan any type of DataSource.
List<DataSource> children = unionDataSource.getDataSources();

final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder();
final List<DataSource> newChildren = new ArrayList<>();
final List<InputSpec> inputSpecs = new ArrayList<>();
final IntSet broadcastInputs = new IntOpenHashSet();

for (DataSource child : children) {
DataSourcePlan childDataSourcePlan = forDataSource(
queryKit,
queryId,
queryContext,
child,
querySegmentSpec,
filter,
maxWorkerCount,
Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()),
broadcast
);

int shift = inputSpecs.size();

newChildren.add(shiftInputNumbers(childDataSourcePlan.getNewDataSource(), shift));
inputSpecs.addAll(childDataSourcePlan.getInputSpecs());
childDataSourcePlan.getSubQueryDefBuilder().ifPresent(subqueryDefBuilder::addAll);
childDataSourcePlan.getBroadcastInputs().forEach(inp -> broadcastInputs.add(inp + shift));
}
return new DataSourcePlan(
new UnionDataSource(newChildren),
inputSpecs,
broadcastInputs,
subqueryDefBuilder
);
}

/**
* Build a plan for broadcast hash-join.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,4 @@ public void testArrayAggQueryOnComplexDatatypes()
}
}

/**
* Doesn't pass through Druid however the planning error is different as it rewrites to a union datasource.
* This test is disabled because MSQ wants to support union datasources, and it makes little sense to add highly
* conditional planning error for the same. Planning errors are merely hints, and this is one of those times
* when the hint is incorrect till MSQ starts supporting the union datasource.
*/
@Test
@Override
public void testUnionIsUnplannable()
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.CalciteUnionQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
* Runs {@link CalciteUnionQueryTest} but with MSQ engine
*/
public class CalciteUnionQueryMSQTest extends CalciteUnionQueryTest
{
private TestGroupByBuffers groupByBuffers;

@Before
public void setup2()
{
groupByBuffers = TestGroupByBuffers.createDefault();
}

@After
public void teardown2()
{
groupByBuffers.close();
}

@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModules(CalciteMSQTestsHelper.fetchModules(temporaryFolder, groupByBuffers).toArray(new Module[0]));
}


@Override
public SqlEngine createEngine(
QueryLifecycleFactory qlf,
ObjectMapper queryJsonMapper,
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters =
WorkerMemoryParameters.createInstance(
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
2,
10,
2,
0,
0
);
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper),
workerMemoryParameters
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
}

@Override
protected QueryTestBuilder testBuilder()
{
return new QueryTestBuilder(new BaseCalciteQueryTest.CalciteTestConfig(true))
.addCustomRunner(new ExtractResultsFactory(() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient()))
.skipVectorize(true)
.verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate())
.msqCompatible(msqCompatible);
}

/**
* Doesn't pass through Druid however the planning error is different as it rewrites to a union datasource.
* This test is disabled because MSQ wants to support union datasources, and it makes little sense to add highly
* conditional planning error for the same. Planning errors are merely hints, and this is one of those times
* when the hint is incorrect till MSQ starts supporting the union datasource.
*/
@Test
@Override
public void testUnionIsUnplannable()
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.planning.DataSourceAnalysis;
Expand All @@ -36,13 +37,16 @@
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* TODO(laksh):
*/
public class UnionDataSource implements DataSource
{
@JsonProperty
private final List<TableDataSource> dataSources;
private final List<DataSource> dataSources;

@JsonCreator
public UnionDataSource(@JsonProperty("dataSources") List<TableDataSource> dataSources)
public UnionDataSource(@JsonProperty("dataSources") List<DataSource> dataSources)
{
if (dataSources == null || dataSources.isEmpty()) {
throw new ISE("'dataSources' must be non-null and non-empty for 'union'");
Expand All @@ -51,18 +55,38 @@ public UnionDataSource(@JsonProperty("dataSources") List<TableDataSource> dataSo
this.dataSources = dataSources;
}

public List<DataSource> getDataSources()
{
return dataSources;
}


// TODO: native only method
@Override
public Set<String> getTableNames()
{
return dataSources.stream()
.map(input -> Iterables.getOnlyElement(input.getTableNames()))
.map(input -> {
if (!(input instanceof TableDataSource)) {
throw DruidException.defensive("should be table");
}
return Iterables.getOnlyElement(input.getTableNames());
})
.collect(Collectors.toSet());
}

// TODO: native only method
@JsonProperty
public List<TableDataSource> getDataSources()
public List<TableDataSource> getDataSourcesAsTableDataSources()
{
return dataSources;
return dataSources.stream()
.map(input -> {
if (!(input instanceof TableDataSource)) {
throw DruidException.defensive("should be table");
}
return (TableDataSource) input;
})
.collect(Collectors.toList());
}

@Override
Expand All @@ -78,10 +102,6 @@ public DataSource withChildren(List<DataSource> children)
throw new IAE("Expected [%d] children, got [%d]", dataSources.size(), children.size());
}

if (!children.stream().allMatch(dataSource -> dataSource instanceof TableDataSource)) {
throw new IAE("All children must be tables");
}

return new UnionDataSource(
children.stream().map(dataSource -> (TableDataSource) dataSource).collect(Collectors.toList())
);
Expand Down Expand Up @@ -149,11 +169,7 @@ public boolean equals(Object o)

UnionDataSource that = (UnionDataSource) o;

if (!dataSources.equals(that.dataSources)) {
return false;
}

return true;
return dataSources.equals(that.dataSources);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext respo

final UnionDataSource unionDataSource = analysis.getBaseUnionDataSource().get();

if (unionDataSource.getDataSources().isEmpty()) {
if (unionDataSource.getDataSourcesAsTableDataSources().isEmpty()) {
// Shouldn't happen, because UnionDataSource doesn't allow empty unions.
throw new ISE("Unexpectedly received empty union");
} else if (unionDataSource.getDataSources().size() == 1) {
} else if (unionDataSource.getDataSourcesAsTableDataSources().size() == 1) {
// Single table. Run as a normal query.
return baseRunner.run(
queryPlus.withQuery(
Queries.withBaseDataSource(
query,
Iterables.getOnlyElement(unionDataSource.getDataSources())
Iterables.getOnlyElement(unionDataSource.getDataSourcesAsTableDataSources())
)
),
responseContext
Expand All @@ -77,8 +77,8 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext respo
query.getResultOrdering(),
Sequences.simple(
Lists.transform(
IntStream.range(0, unionDataSource.getDataSources().size())
.mapToObj(i -> new Pair<>(unionDataSource.getDataSources().get(i), i + 1))
IntStream.range(0, unionDataSource.getDataSourcesAsTableDataSources().size())
.mapToObj(i -> new Pair<>(unionDataSource.getDataSourcesAsTableDataSources().get(i), i + 1))
.collect(Collectors.toList()),
(Function<Pair<TableDataSource, Integer>, Sequence<T>>) singleSourceWithIndex ->
baseRunner.run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testUnionDataSource() throws Exception
Assert.assertTrue(dataSource instanceof UnionDataSource);
Assert.assertEquals(
Lists.newArrayList(new TableDataSource("ds1"), new TableDataSource("ds2")),
Lists.newArrayList(((UnionDataSource) dataSource).getDataSources())
Lists.newArrayList(((UnionDataSource) dataSource).getDataSourcesAsTableDataSources())
);
Assert.assertEquals(
ImmutableSet.of("ds1", "ds2"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void test_withChildren_empty()
@Test
public void test_withChildren_sameNumber()
{
final List<TableDataSource> newDataSources = ImmutableList.of(
final List<DataSource> newDataSources = ImmutableList.of(
new TableDataSource("baz"),
new TableDataSource("qux")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public DruidUnionDataSourceRel withPartialQuery(final PartialDruidQuery newQuery
@Override
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final List<TableDataSource> dataSources = new ArrayList<>();
final List<DataSource> dataSources = new ArrayList<>();
RowSignature signature = null;

for (final RelNode relNode : unionRel.getInputs()) {
Expand Down
Loading

0 comments on commit 7e60702

Please sign in to comment.