Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Oct 4, 2023
1 parent cdd63cf commit bcef1aa
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 111 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.query;

import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand All @@ -30,6 +31,8 @@
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;

import java.io.Closeable;

/**
* Helper methods to create cursor from iterable of rows
*/
Expand All @@ -40,7 +43,7 @@ public class IterableRowsCursorHelper
* Creates a cursor that iterates over all the rows generated by the iterable. Presence of __time column is not a
* necessity
*/
public static CursorAndCloseable getCursorFromIterable(Iterable<Object[]> rows, RowSignature rowSignature)
public static Pair<Cursor, Closeable> getCursorFromIterable(Iterable<Object[]> rows, RowSignature rowSignature)
{
return getCursorFromSequence(Sequences.simple(rows), rowSignature);
}
Expand All @@ -49,10 +52,9 @@ public static CursorAndCloseable getCursorFromIterable(Iterable<Object[]> rows,
* Creates a cursor that iterates over all the rows generated by the sequence. Presence of __time column is not a
* necessity.
* <p>
* Returns a {@link CursorAndCloseable} that iterates over the rows and cleans up the created rowWalker when
* {@link CursorAndCloseable#close()} is called on the returned object.
* Returns a pair of cursor that iterates over the rows and closeable that cleans up the created rowWalker
*/
public static CursorAndCloseable getCursorFromSequence(Sequence<Object[]> rows, RowSignature rowSignature)
public static Pair<Cursor, Closeable> getCursorFromSequence(Sequence<Object[]> rows, RowSignature rowSignature)
{
RowAdapter<Object[]> rowAdapter = columnName -> {
if (rowSignature == null) {
Expand All @@ -78,6 +80,6 @@ public static CursorAndCloseable getCursorFromSequence(Sequence<Object[]> rows,
rowSignature != null ? rowSignature : RowSignature.empty()
);

return CursorAndCloseable.create(baseCursor, rowWalker);
return Pair.of(baseCursor, rowWalker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.MappedSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.CursorAndCloseable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.IterableRowsCursorHelper;
Expand All @@ -68,10 +68,12 @@
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTime;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
Expand Down Expand Up @@ -726,12 +728,14 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
);


CursorAndCloseable cursor = IterableRowsCursorHelper.getCursorFromSequence(
Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence(
resultsAsArrays(query, resultSequence),
rowSignature
);
Cursor cursor = cursorAndCloseable.lhs;
Closeable closeble = cursorAndCloseable.rhs;

Sequence<Frame> frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(cursor);
Sequence<Frame> frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(closeble);

return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.CursorAndCloseable;
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.IterableRowsCursorHelper;
Expand All @@ -57,6 +57,7 @@
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.utils.CloseableUtils;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -284,11 +285,15 @@ private Sequence<FrameSignaturePair> convertScanResultValuesToFrame(
final Function<?, Object[]> mapper = getResultFormatMapper(query.getResultFormat(), rowSignature.getColumnNames());
final Iterable<Object[]> formattedRows = Lists.newArrayList(Iterables.transform(rows, (Function) mapper));

CursorAndCloseable cursor = IterableRowsCursorHelper.getCursorFromIterable(formattedRows, rowSignature);
Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable(
formattedRows,
rowSignature
);
Cursor cursor = cursorAndCloseable.lhs;
Closeable closeable = cursorAndCloseable.rhs;
cursors.add(cursor);

// Cursors created from iterators don't have any resources, therefore this is mostly a defensive check
closer.register(cursor);
closer.register(closeable);
}

RowSignature modifiedRowSignature = useNestedForUnknownTypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.CursorAndCloseable;
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.IterableRowsCursorHelper;
import org.apache.druid.query.Query;
Expand All @@ -59,12 +59,14 @@
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTime;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -474,10 +476,12 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
)
{
final RowSignature rowSignature = resultArraySignature(query);
final CursorAndCloseable cursor = IterableRowsCursorHelper.getCursorFromSequence(
final Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence(
resultsAsArrays(query, resultSequence),
rowSignature
);
final Cursor cursor = cursorAndCloseable.lhs;
final Closeable closeable = cursorAndCloseable.rhs;

RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
Expand All @@ -489,7 +493,7 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
new ArrayList<>()
);

Sequence<Frame> frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(cursor);
Sequence<Frame> frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(closeable);

// All frames are generated with the same signature therefore we can attach the row signature
return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.BySegmentResultValue;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.CursorAndCloseable;
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.IterableRowsCursorHelper;
import org.apache.druid.query.Query;
Expand All @@ -60,10 +60,12 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTime;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -558,10 +560,12 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
)
{
final RowSignature rowSignature = resultArraySignature(query);
final CursorAndCloseable cursor = IterableRowsCursorHelper.getCursorFromSequence(
final Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence(
resultsAsArrays(query, resultSequence),
rowSignature
);
Cursor cursor = cursorAndCloseable.lhs;
Closeable closeable = cursorAndCloseable.rhs;

RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
Expand All @@ -573,7 +577,7 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
new ArrayList<>()
);

Sequence<Frame> frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(cursor);
Sequence<Frame> frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(closeable);

return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnType;
Expand All @@ -40,6 +41,7 @@
import org.junit.Assert;
import org.junit.Test;

import java.io.Closeable;
import java.util.ArrayList;

public class FrameBasedInlineDataSourceSerializerTest
Expand Down Expand Up @@ -124,10 +126,11 @@ private FrameBasedInlineDataSource convertToFrameBasedInlineDataSource(
RowSignature rowSignature
)
{
Cursor cursor = IterableRowsCursorHelper.getCursorFromIterable(
Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable(
inlineDataSource.getRows(),
rowSignature
);
Cursor cursor = cursorAndCloseable.lhs;
RowSignature modifiedRowSignature = FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature);
Sequence<Frame> frames = FrameCursorUtils.cursorToFrames(
cursor,
Expand All @@ -139,7 +142,7 @@ private FrameBasedInlineDataSource convertToFrameBasedInlineDataSource(
)
);
return new FrameBasedInlineDataSource(
frames.map(frame -> new FrameSignaturePair(frame, rowSignature)).toList(),
frames.map(frame -> new FrameSignaturePair(frame, rowSignature)).withBaggage(cursorAndCloseable.rhs).toList(),
modifiedRowSignature
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public class IterableRowsCursorHelperTest
@Test
public void getCursorFromIterable()
{
Cursor cursor = IterableRowsCursorHelper.getCursorFromIterable(rows, rowSignature);
Cursor cursor = IterableRowsCursorHelper.getCursorFromIterable(rows, rowSignature).lhs;
testCursorMatchesRowSequence(cursor, rowSignature, rows);
}

@Test
public void getCursorFromSequence()
{

Cursor cursor = IterableRowsCursorHelper.getCursorFromSequence(Sequences.simple(rows), rowSignature);
Cursor cursor = IterableRowsCursorHelper.getCursorFromSequence(Sequences.simple(rows), rowSignature).lhs;
testCursorMatchesRowSequence(cursor, rowSignature, rows);
}

Expand Down
Loading

0 comments on commit bcef1aa

Please sign in to comment.