Skip to content

Commit

Permalink
review round 1, javadocs, refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Oct 3, 2023
1 parent ed972f6 commit 9422069
Show file tree
Hide file tree
Showing 18 changed files with 175 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

/**
* Reads the values produced by {@link DoubleFieldWriter}
*
* @see DoubleFieldWriter
* @see NumericFieldWriter for the details of the byte-format that it expects for reading
*/
public class DoubleFieldReader extends NumericFieldReader
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

/**
* Wraps a {@link BaseDoubleColumnValueSelector} and writes field values.
*
* @see NumericFieldWriter for the details of the byte-format that it writes as
*/
public class DoubleFieldWriter extends NumericFieldWriter
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ public static FieldReader create(final String columnName, final ColumnType colum
return DoubleFieldReader.forPrimitive();

case STRING:
return new StringFieldReader(false);
return new StringFieldReader();

case COMPLEX:
return ComplexFieldReader.createFromType(columnType);

case ARRAY:
switch (Preconditions.checkNotNull(columnType.getElementType().getType(), "array elementType")) {
case STRING:
return new StringFieldReader(true);
return new StringArrayFieldReader();

case LONG:
return new LongArrayFieldReader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public static FieldWriter create(
}

switch (columnType.getType()) {

case LONG:
return makeLongWriter(columnSelectorFactory, columnName);

Expand All @@ -81,18 +80,13 @@ public static FieldWriter create(
return makeComplexWriter(columnSelectorFactory, columnName, columnType.getComplexTypeName());

case ARRAY:

switch (columnType.getElementType().getType()) {

case STRING:
return makeStringArrayWriter(columnSelectorFactory, columnName);

case LONG:
return makeLongArrayWriter(columnSelectorFactory, columnName);

case FLOAT:
return makeFloatArrayWriter(columnSelectorFactory, columnName);

case DOUBLE:
return makeDoubleArrayWriter(columnSelectorFactory, columnName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

/**
* Reads values written by {@link FloatFieldWriter}.
*
* @see FloatFieldWriter
* @see NumericFieldWriter for the details of the byte-format that it expects for reading
*/
public class FloatFieldReader extends NumericFieldReader
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

/**
* Wraps a {@link BaseFloatColumnValueSelector} and writes field values.
*
* @see NumericFieldWriter for the details of the byte-format that it writes as
*/
public class FloatFieldWriter extends NumericFieldWriter
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

/**
* Reads values written by {@link LongFieldWriter}.
*
* @see LongFieldWriter
* @see NumericFieldWriter for the details of the byte-format that it expects for reading
*/
public class LongFieldReader extends NumericFieldReader
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

/**
* Wraps a {@link BaseLongColumnValueSelector} and writes individual values into frame rows.
*
* @see NumericFieldWriter for the details of the byte-format that it writes as
*/
public class LongFieldWriter extends NumericFieldWriter
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
* Once all the values in the non-null arrays are over, writes {@link #ARRAY_TERMINATOR}. This is to aid the byte
* comparison, and also let the reader know that the number of elements in the array are over.
* <p>
* The format doesn't add the number of elements in the array at the beginning, though that would have been more
* convenient to keep the written array value byte comparable
* The format doesn't add the number of elements in the array at the beginning, so that the serialization of the arrays
* are byte-by-byte comparable.
* <p>
* Examples:
* 1. null
Expand Down Expand Up @@ -138,7 +138,7 @@ public long writeTo(WritableMemory memory, long position, long maxSize)
return requiredSize;
} else {

List<? extends Number> list = FrameWriterUtils.getNumericArrayFromNumericArray(row);
List<? extends Number> list = FrameWriterUtils.getNumericArrayFromObject(row);

if (list == null) {
int requiredSize = Byte.BYTES;
Expand Down Expand Up @@ -211,6 +211,10 @@ public Class<? extends Number> classOfObject()

NumericFieldWriter writer = writerFactory.get(columnValueSelector);

// First byte is reserved for null marker of the array
// Next [(1 + Numeric Size) x Number of elements of array] bytes are reserved for the elements of the array and
// their null markers
// Last byte is reserved for array termination
int requiredSize = Byte.BYTES + (writer.getNumericSizeBytes() + Byte.BYTES) * list.size() + Byte.BYTES;

if (requiredSize > maxSize) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.frame.field;

/**
* Reads fields written by {@link StringArrayFieldWriter}
*
* @see StringFieldReader for more details on the format that the reader expects
* @see StringFieldReader#StringFieldReader(boolean) for the selector that the reader returns
*/
public class StringArrayFieldReader extends StringFieldReader
{
StringArrayFieldReader()
{
super(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,19 @@ public class StringFieldReader implements FieldReader
{
private final boolean asArray;

public StringFieldReader()
{
this(false);
}

/**
* Create a string reader.
*
* @param asArray if false, selectors from {@link #makeColumnValueSelector} behave like {@link ValueType#STRING}
* selectors (potentially multi-value ones). If true, selectors from {@link #makeColumnValueSelector}
* behave like string array selectors.
*/
StringFieldReader(final boolean asArray)
protected StringFieldReader(final boolean asArray)
{
this.asArray = asArray;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,16 @@
import org.apache.druid.frame.read.columnar.FrameColumnReaders;
import org.apache.druid.frame.segment.row.FrameCursorFactory;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -61,28 +57,15 @@ public class FrameReader
// Field readers, for row-based frames.
private final List<FieldReader> fieldReaders;

/**
* Currently, only ROW_BASED frames support numerical array columns, while the COLUMNAR frames donot. While creating
* a FrameReader, for types unsupported by COLUMNAR frames, we populate this field to denote that the FrameReader is
* "incomplete" and can't be used to read the columnar frame. However, the FrameReader performs as expected for the
* row-based frames.
* In short, this is a temporary measure till columnar frames support the numerical array types to punt the unsupported
* type check for the numerical arrays (for COLUMNAR frames only) at the usage time, rather than the creation time
*/
private final Optional<Pair<String, ColumnType>> unsupportedColumnAndType;


private FrameReader(
final RowSignature signature,
final List<FrameColumnReader> columnReaders,
final List<FieldReader> fieldReaders,
final Optional<Pair<String, ColumnType>> unsupportedColumnAndType
final List<FieldReader> fieldReaders
)
{
this.signature = signature;
this.columnReaders = columnReaders;
this.fieldReaders = fieldReaders;
this.unsupportedColumnAndType = unsupportedColumnAndType;
}

/**
Expand All @@ -105,7 +88,6 @@ public static FrameReader create(final RowSignature signature)

final List<FrameColumnReader> columnReaders = new ArrayList<>(signature.size());
final List<FieldReader> fieldReaders = new ArrayList<>(signature.size());
Optional<Pair<String, ColumnType>> unsupportedColumnAndType = Optional.empty();

for (int columnNumber = 0; columnNumber < signature.size(); columnNumber++) {
final ColumnType columnType =
Expand All @@ -116,27 +98,10 @@ public static FrameReader create(final RowSignature signature)
);

fieldReaders.add(FieldReaders.create(signature.getColumnName(columnNumber), columnType));

// If we encounter a numeric array type, then don't throw the error immediately since the reader can be used to
// read only the ROW_BASED frames. Rather, set the optional, and throw the appropriate error message when the reader
// tries to read COLUMNAR frame. This should go away once the COLUMNAR frames also start supporting the numeric
// array
if (columnType.getType() == ValueType.ARRAY
&& Preconditions.checkNotNull(
columnType.getElementType(),
"Element type for array column [%s]",
signature.getColumnName(columnNumber)
).getType().isNumeric()
) {
if (!unsupportedColumnAndType.isPresent()) {
unsupportedColumnAndType = Optional.of(Pair.of(signature.getColumnName(columnNumber), columnType));
}
} else {
columnReaders.add(FrameColumnReaders.create(columnNumber, columnType));
}
columnReaders.add(FrameColumnReaders.create(signature.getColumnName(columnNumber), columnNumber, columnType));
}

return new FrameReader(signature, columnReaders, fieldReaders, unsupportedColumnAndType);
return new FrameReader(signature, columnReaders, fieldReaders);
}

public RowSignature signature()
Expand All @@ -163,7 +128,6 @@ public ColumnCapabilities columnCapabilities(final Frame frame, final String col
case COLUMNAR:
// Better than frameReader.frameSignature().getColumnCapabilities(columnName), because this method has more
// insight into what's actually going on with this column (nulls, multivalue, etc).
throwIfIncompleteColumnReaders();
return columnReaders.get(columnNumber).readColumn(frame).getCapabilities();
default:
return signature.getColumnCapabilities(columnName);
Expand All @@ -178,7 +142,6 @@ public CursorFactory makeCursorFactory(final Frame frame)
{
switch (frame.type()) {
case COLUMNAR:
throwIfIncompleteColumnReaders();
return new org.apache.druid.frame.segment.columnar.FrameCursorFactory(frame, signature, columnReaders);
case ROW_BASED:
return new FrameCursorFactory(frame, this, fieldReaders);
Expand All @@ -199,13 +162,4 @@ public FrameComparisonWidget makeComparisonWidget(final Frame frame, final List<

return FrameComparisonWidgetImpl.create(frame, signature, keyColumns, fieldReaders.subList(0, keyColumns.size()));
}

private void throwIfIncompleteColumnReaders()
{
unsupportedColumnAndType.ifPresent(
val -> {
throw new UnsupportedColumnTypeException(val.lhs, val.rhs);
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@

package org.apache.druid.frame.read.columnar;

import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;

/**
* Creates {@link FrameColumnReader} corresponding to a given column type and number.
*
* Returns a dummy {@link UnsupportedColumnTypeFrameColumnReader} if the column type is not supported or unknown.
* Calling any method of the dummy reader will throw with relevant error message.
*/
public class FrameColumnReaders
{
private FrameColumnReaders()
{
// No instantiation.
}

public static FrameColumnReader create(final int columnNumber, final ColumnType columnType)
public static FrameColumnReader create(
final String columnName,
final int columnNumber,
final ColumnType columnType
)
{
switch (columnType.getType()) {
case LONG:
Expand All @@ -51,11 +60,12 @@ public static FrameColumnReader create(final int columnNumber, final ColumnType
case ARRAY:
if (columnType.getElementType().getType() == ValueType.STRING) {
return new StringFrameColumnReader(columnNumber, true);
} else {
return new UnsupportedColumnTypeFrameColumnReader(columnName, columnType);
}
// Fall through to error for other array types

default:
throw new UOE("Unsupported column type [%s]", columnType);
return new UnsupportedColumnTypeFrameColumnReader(columnName, columnType);
}
}
}
Loading

0 comments on commit 9422069

Please sign in to comment.