Skip to content

Commit

Permalink
arrayIngestMode
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Oct 7, 2023
1 parent db2ac3a commit 9fd8258
Show file tree
Hide file tree
Showing 7 changed files with 463 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.input.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import org.apache.druid.msq.util.ArrayIngestMode;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.msq.util.IntervalUtils;
import org.apache.druid.msq.util.MSQFutureUtils;
Expand Down Expand Up @@ -1928,6 +1929,17 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
final Query<?> query
)
{
// Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to
// deprecation and removal in future
if (MultiStageQueryContext.getArrayIngestMode(query.context()) == ArrayIngestMode.MVD) {
log.warn(
"'%s' is set to 'mvd' in the query's context. This ingests the string arrays as multi-value "
+ "strings instead of arrays, and is preserved for legacy reasons when MVDs were the only way to ingest string "
+ "arrays in Druid. It is incorrect behaviour and will likely be removed in the future releases of Druid",
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE
);
}

final List<DimensionSchema> dimensions = new ArrayList<>();
final List<AggregatorFactory> aggregators = new ArrayList<>();

Expand Down Expand Up @@ -2006,7 +2018,7 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
outputColumnName,
type,
MultiStageQueryContext.useAutoColumnSchemas(query.context()),
MultiStageQueryContext.isIngestStringArraysAsMVDs(query.context())
MultiStageQueryContext.getArrayIngestMode(query.context())
)
);
} else if (!isRollupQuery) {
Expand Down Expand Up @@ -2056,7 +2068,7 @@ private static void populateDimensionsAndAggregators(
outputColumn,
type,
MultiStageQueryContext.useAutoColumnSchemas(context),
MultiStageQueryContext.isIngestStringArraysAsMVDs(context)
MultiStageQueryContext.getArrayIngestMode(context)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ private static Iterator<SegmentWithDescriptor> inputSourceSegmentIterator(
new DimensionsSpec(
signature.getColumnNames().stream().map(
column ->
DimensionSchemaUtils.createDimensionSchema(
DimensionSchemaUtils.createDimensionSchemaForExtern(
column,
signature.getColumnType(column).orElse(null),
false,
false
signature.getColumnType(column).orElse(null)
)
).collect(Collectors.toList())
),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.util;

/**
* Values that the query context flag 'arrayIngestMode' can take to specify the behaviour of ingestion of arrays via
* MSQ's INSERT queries
*/
public enum ArrayIngestMode
{
/**
* Disables the ingestion of arrays via MSQ's INSERT queries.
*/
NONE,

/**
* String arrays are ingested as MVDs. This is to preserve the legacy behaviour of Druid and will be removed in the
* future, since MVDs are not true array types and the behaviour is incorrect.
* This also disables the ingestion of numeric arrays
*/
MVD,

/**
* Allows numeric and string arrays to be ingested as arrays. This should be the preferred method of ingestion,
* unless bound by compatibility reasons to use 'mvd'
*/
ARRAY
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnCapabilities;
Expand All @@ -40,11 +42,26 @@
*/
public class DimensionSchemaUtils
{

/**
* Creates a dimension schema for creating {@link org.apache.druid.data.input.InputSourceReader}.
*/
public static DimensionSchema createDimensionSchemaForExtern(final String column, @Nullable final ColumnType type)
{
return createDimensionSchema(
column,
type,
false,
// Least restrictive mode since we do not have any type restrictions while reading the extern files.
ArrayIngestMode.ARRAY
);
}

public static DimensionSchema createDimensionSchema(
final String column,
@Nullable final ColumnType type,
boolean useAutoType,
boolean writeStringArraysAsMVDs
ArrayIngestMode arrayIngestMode
)
{
if (useAutoType) {
Expand Down Expand Up @@ -75,15 +92,33 @@ public static DimensionSchema createDimensionSchema(
case ARRAY:
switch (type.getElementType().getType()) {
case STRING:
if (writeStringArraysAsMVDs) {
if (arrayIngestMode == ArrayIngestMode.NONE) {
throw InvalidInput.exception(
"String arrays can not be ingested when '%s' is set to '%s'. Either set '%s' in query context "
+ "to 'array' to ingest the string array as an array, or set it to 'mvd' to ingest the string array "
+ "as MVD (which is legacy behaviour and not recommmended)",
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE,
StringUtils.toLowerCase(arrayIngestMode.name()),
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE
);
} else if (arrayIngestMode == ArrayIngestMode.MVD) {
return new StringDimensionSchema(column, DimensionSchema.MultiValueHandling.ARRAY, null);
} else {
} else if (arrayIngestMode == ArrayIngestMode.ARRAY) {
return new AutoTypeColumnSchema(column);
}
case LONG:
case FLOAT:
case DOUBLE:
return new AutoTypeColumnSchema(column);
if (arrayIngestMode == ArrayIngestMode.ARRAY) {
return new AutoTypeColumnSchema(column);
} else {
throw InvalidInput.exception(
"Numeric arrays can only be ingested when '%s' is set to 'array' in the MSQ query's context. "
+ "Current value of the parameter [%s]",
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE,
StringUtils.toLowerCase(arrayIngestMode.name())
);
}
default:
throw new ISE("Cannot create dimension for type [%s]", type.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@
* {@link org.apache.druid.segment.AutoTypeColumnSchema} for all 'standard' type columns during segment generation,
* see {@link DimensionSchemaUtils#createDimensionSchema} for more details.
*
* <li><b>ingestStringArraysAsMVDs</b>: Flag to ingest the string arrays using string dimension handlers, which generates
* MVDs instead of {@code ARRAY<STRING>}. The flag is undocumented, and provided to preserve legacy behaviour.
* see {@link DimensionSchemaUtils#createDimensionSchema} for more details.
*
* <li><b>arrayIngestMode</b>: Tri-state query context that controls the behaviour and support of arrays that are
* ingested via MSQ. If set to 'none', arrays are not allowed to be ingested in MSQ. If set to 'array', array types
* can be ingested as expected. If set to 'mvd', numeric arrays can not be ingested, and string arrays will be
* ingested as MVDs (this is kept for legacy purpose).
* </ol>
**/
public class MultiStageQueryContext
Expand Down Expand Up @@ -127,8 +127,8 @@ public class MultiStageQueryContext
public static final String CTX_USE_AUTO_SCHEMAS = "useAutoColumnSchemas";
public static final boolean DEFAULT_USE_AUTO_SCHEMAS = false;

public static final String CTX_INGEST_STRING_ARRAYS_AS_MVDS = "ingestStringArraysAsMVDs";
public static final boolean DEFAULT_INGEST_STRING_ARRAYS_AS_MVDS = false;
public static final String CTX_ARRAY_INGEST_MODE = "arrayIngestMode";
public static final ArrayIngestMode DEFAULT_ARRAY_INGEST_MODE = ArrayIngestMode.NONE;


private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);
Expand Down Expand Up @@ -255,9 +255,9 @@ public static boolean useAutoColumnSchemas(final QueryContext queryContext)
return queryContext.getBoolean(CTX_USE_AUTO_SCHEMAS, DEFAULT_USE_AUTO_SCHEMAS);
}

public static boolean isIngestStringArraysAsMVDs(final QueryContext queryContext)
public static ArrayIngestMode getArrayIngestMode(final QueryContext queryContext)
{
return queryContext.getBoolean(CTX_INGEST_STRING_ARRAYS_AS_MVDS, DEFAULT_INGEST_STRING_ARRAYS_AS_MVDS);
return queryContext.getEnum(CTX_ARRAY_INGEST_MODE, ArrayIngestMode.class, DEFAULT_ARRAY_INGEST_MODE);
}

/**
Expand Down
Loading

0 comments on commit 9fd8258

Please sign in to comment.