Skip to content

Commit

Permalink
Support stripe with row count greater than int32 (#11314)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11314

Currently the stripe offset is typed as int32 so trying to read
anything larger than that would result in crash.  Fix this by using int64 for
the offset types.

Reviewed By: xiaoxmeng, HuamengJiang

Differential Revision: D64720132

fbshipit-source-id: cfd269c82c097c8fa09389cdb420bf161956e873
  • Loading branch information
Yuhta authored and facebook-github-bot committed Oct 22, 2024
1 parent f93eae6 commit 1fc46ad
Show file tree
Hide file tree
Showing 44 changed files with 133 additions and 151 deletions.
2 changes: 1 addition & 1 deletion velox/dwio/common/FormatData.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class FormatData {
/// is in FormatData the provider is at end. For ORC/DWRF the type
/// dependent stream positions are accessed via the provider. The
/// provider is valid until next call of this.
virtual dwio::common::PositionProvider seekToRowGroup(uint32_t index) = 0;
virtual dwio::common::PositionProvider seekToRowGroup(int64_t index) = 0;

struct FilterRowGroupsResult {
std::vector<uint64_t> filterResult;
Expand Down
8 changes: 3 additions & 5 deletions velox/dwio/common/SelectiveByteRleColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ class SelectiveByteRleColumnReader : public SelectiveColumnReader {
ExtractValues extractValues);

template <typename Reader, bool kEncodingHasNulls>
void readCommon(
vector_size_t offset,
const RowSet& rows,
const uint64_t* incomingNulls);
void
readCommon(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls);
};

template <
Expand Down Expand Up @@ -163,7 +161,7 @@ void SelectiveByteRleColumnReader::processValueHook(

template <typename Reader, bool kEncodingHasNulls>
void SelectiveByteRleColumnReader::readCommon(
vector_size_t offset,
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
prepareRead<int8_t>(offset, rows, incomingNulls);
Expand Down
8 changes: 4 additions & 4 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const std::vector<SelectiveColumnReader*>& SelectiveColumnReader::children()
return empty;
}

void SelectiveColumnReader::seekTo(vector_size_t offset, bool readsNullsOnly) {
void SelectiveColumnReader::seekTo(int64_t offset, bool readsNullsOnly) {
if (offset == readOffset_) {
return;
}
Expand Down Expand Up @@ -455,7 +455,7 @@ void SelectiveColumnReader::resetFilterCaches() {
}

void SelectiveColumnReader::addParentNulls(
int32_t firstRowInNulls,
int64_t firstRowInNulls,
const uint64_t* nulls,
const RowSet& rows) {
const int32_t firstNullIndex =
Expand All @@ -466,8 +466,8 @@ void SelectiveColumnReader::addParentNulls(
}

void SelectiveColumnReader::addSkippedParentNulls(
vector_size_t from,
vector_size_t to,
int64_t from,
int64_t to,
int32_t numNulls) {
auto rowsPerRowGroup = formatData_->rowsPerRowGroup();
if (rowsPerRowGroup.has_value() &&
Expand Down
25 changes: 11 additions & 14 deletions velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,8 @@ class SelectiveColumnReader {
// relative to 'offset', so that row 0 is the 'offset'th row from
// start of stripe. 'rows' is expected to stay constant
// between this and the next call to read.
virtual void read(
vector_size_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) = 0;
virtual void
read(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls) = 0;

virtual uint64_t skip(uint64_t numValues) {
return formatData_->skip(numValues);
Expand All @@ -193,14 +191,14 @@ class SelectiveColumnReader {

// Advances to 'offset', so that the next item to be read is the
// offset-th from the start of stripe.
virtual void seekTo(vector_size_t offset, bool readsNullsOnly);
virtual void seekTo(int64_t offset, bool readsNullsOnly);

/// Positions this at the start of 'index'th row group. Interpretation of
/// 'index' depends on format. Clears counts of skipped enclosing struct nulls
/// for formats where nulls are recorded at each nesting level, i.e. not
/// rep-def.
virtual void seekToRowGroup(uint32_t index) {
VELOX_TRACE_HISTORY_PUSH("seekToRowGroup %u", index);
virtual void seekToRowGroup(int64_t index) {
VELOX_TRACE_HISTORY_PUSH("seekToRowGroup %" PRId64, index);
numParentNulls_ = 0;
parentNullsRecordedTo_ = 0;
}
Expand Down Expand Up @@ -361,11 +359,11 @@ class SelectiveColumnReader {
return readOffset_;
}

void setReadOffset(vector_size_t readOffset) {
void setReadOffset(int64_t readOffset) {
readOffset_ = readOffset;
}

virtual void setReadOffsetRecursive(int32_t readOffset) {
virtual void setReadOffsetRecursive(int64_t readOffset) {
setReadOffset(readOffset);
}

Expand Down Expand Up @@ -448,16 +446,15 @@ class SelectiveColumnReader {
/// level rows and represents all null parents at any enclosing level. 'nulls'
/// is nullptr if there are no parent nulls.
void addParentNulls(
int32_t firstRowInNulls,
int64_t firstRowInNulls,
const uint64_t* nulls,
const RowSet& rows);

// When skipping rows in a struct, records how many parent nulls at
// any level there are between top level row 'from' and 'to'. If
// called many times, the 'from' of the next should be the 'to' of
// the previous.
void
addSkippedParentNulls(vector_size_t from, vector_size_t to, int32_t numNulls);
void addSkippedParentNulls(int64_t from, int64_t to, int32_t numNulls);

static constexpr int8_t kNoValueSize = -1;
static constexpr uint32_t kRowGroupNotSet = ~0;
Expand Down Expand Up @@ -508,7 +505,7 @@ class SelectiveColumnReader {
protected:
template <typename T>
void prepareRead(
vector_size_t offset,
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls);

Expand Down Expand Up @@ -618,7 +615,7 @@ class SelectiveColumnReader {

// Row number after last read row, relative to the ORC stripe or Parquet
// Rowgroup start.
vector_size_t readOffset_ = 0;
int64_t readOffset_ = 0;

// Number of parent nulls between 'readOffset_' and 'parentNullsRecordedTo_'.
// When skipping, subtract the parent nulls from the skip distance because the
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/SelectiveColumnReaderInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void SelectiveColumnReader::ensureValuesCapacity(vector_size_t numRows) {

template <typename T>
void SelectiveColumnReader::prepareRead(
vector_size_t offset,
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
const bool readsNullsOnly = this->readsNullsOnly();
Expand Down
8 changes: 3 additions & 5 deletions velox/dwio/common/SelectiveFloatingPointColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ class SelectiveFloatingPointColumnReader : public SelectiveColumnReader {
}

template <typename Reader, bool kEncodingHasNulls>
void readCommon(
vector_size_t offset,
const RowSet& rows,
const uint64_t* incomingNulls);
void
readCommon(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls);

void getValues(const RowSet& rows, VectorPtr* result) override {
getFlatValues<TData, TRequested>(rows, result, requestedType_);
Expand Down Expand Up @@ -174,7 +172,7 @@ void SelectiveFloatingPointColumnReader<TData, TRequested>::processValueHook(
template <typename TData, typename TRequested>
template <typename Reader, bool kEncodingHasNulls>
void SelectiveFloatingPointColumnReader<TData, TRequested>::readCommon(
vector_size_t offset,
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
prepareRead<TData>(offset, rows, incomingNulls);
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/common/SelectiveRepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ uint64_t SelectiveListColumnReader::skip(uint64_t numValues) {
}

void SelectiveListColumnReader::read(
vector_size_t offset,
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
// Catch up if the child is behind the length stream.
Expand Down Expand Up @@ -287,7 +287,7 @@ uint64_t SelectiveMapColumnReader::skip(uint64_t numValues) {
}

void SelectiveMapColumnReader::read(
vector_size_t offset,
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
// Catch up if child readers are behind the length stream.
Expand Down
14 changes: 5 additions & 9 deletions velox/dwio/common/SelectiveRepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class SelectiveRepeatedColumnReader : public SelectiveColumnReader {
// length stream. The child readers can be behind if the last parents were
// null, so that the child stream was only read up to the last position
// corresponding to the last non-null parent.
vector_size_t childTargetReadOffset_ = 0;
int64_t childTargetReadOffset_ = 0;
std::vector<SelectiveColumnReader*> children_;
};

Expand All @@ -96,10 +96,8 @@ class SelectiveListColumnReader : public SelectiveRepeatedColumnReader {

uint64_t skip(uint64_t numValues) override;

void read(
vector_size_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) override;
void read(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls)
override;

void getValues(const RowSet& rows, VectorPtr* result) override;

Expand All @@ -122,10 +120,8 @@ class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader {

uint64_t skip(uint64_t numValues) override;

void read(
vector_size_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) override;
void read(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls)
override;

void getValues(const RowSet& rows, VectorPtr* result) override;

Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void SelectiveStructColumnReaderBase::next(
}

void SelectiveStructColumnReaderBase::read(
vector_size_t offset,
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
numReads_ = scanSpec_->newRead();
Expand Down Expand Up @@ -247,7 +247,7 @@ void SelectiveStructColumnReaderBase::read(
}

void SelectiveStructColumnReaderBase::recordParentNullsInChildren(
vector_size_t offset,
int64_t offset,
const RowSet& rows) {
if (formatData_->parentNullsInLeaves()) {
return;
Expand Down
20 changes: 9 additions & 11 deletions velox/dwio/common/SelectiveStructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,33 +40,31 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
const dwio::common::StatsContext& context,
FormatData::FilterRowGroupsResult&) const override;

void read(
vector_size_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) override;
void read(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls)
override;

void getValues(const RowSet& rows, VectorPtr* result) override;

uint64_t numReads() const {
return numReads_;
}

vector_size_t lazyVectorReadOffset() const {
int64_t lazyVectorReadOffset() const {
return lazyVectorReadOffset_;
}

/// Advance field reader to the row group closest to specified offset by
/// calling seekToRowGroup.
virtual void advanceFieldReader(
SelectiveColumnReader* reader,
vector_size_t offset) = 0;
int64_t offset) = 0;

// Returns the nulls bitmap from reading this. Used in LazyVector loaders.
const uint64_t* nulls() const {
return nullsInReadRange_ ? nullsInReadRange_->as<uint64_t>() : nullptr;
}

void setReadOffsetRecursive(vector_size_t readOffset) override {
void setReadOffsetRecursive(int64_t readOffset) override {
readOffset_ = readOffset;
for (auto& child : children_) {
child->setReadOffsetRecursive(readOffset);
Expand Down Expand Up @@ -127,7 +125,7 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
/// each child reader and the end of the range of 'read(). This must be done
/// also if a child is not read so that we know how much to skip when seeking
/// forward within the row group.
void recordParentNullsInChildren(vector_size_t offset, const RowSet& rows);
void recordParentNullsInChildren(int64_t offset, const RowSet& rows);

bool hasDeletion() const final {
return hasDeletion_;
Expand Down Expand Up @@ -162,7 +160,7 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
// created by 'this' to verify they are still valid at load.
uint64_t numReads_ = 0;

vector_size_t lazyVectorReadOffset_;
int64_t lazyVectorReadOffset_;

// Dense set of rows to read in next().
raw_vector<vector_size_t> rows_;
Expand Down Expand Up @@ -221,7 +219,7 @@ class SelectiveFlatMapColumnReaderHelper {
}
}

void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls);
void read(int64_t offset, RowSet rows, const uint64_t* incomingNulls);

void getValues(RowSet rows, VectorPtr* result);

Expand Down Expand Up @@ -275,7 +273,7 @@ class SelectiveFlatMapColumnReaderHelper {

template <typename T, typename KeyNode, typename FormatData>
void SelectiveFlatMapColumnReaderHelper<T, KeyNode, FormatData>::read(
vector_size_t offset,
int64_t offset,
RowSet rows,
const uint64_t* incomingNulls) {
reader_.numReads_ = reader_.scanSpec_->newRead();
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/reader/DwrfData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void DwrfData::ensureRowGroupIndex() {
}
}

dwio::common::PositionProvider DwrfData::seekToRowGroup(uint32_t index) {
dwio::common::PositionProvider DwrfData::seekToRowGroup(int64_t index) {
ensureRowGroupIndex();

positionsHolder_ = toPositionsInner(index_->entry(index));
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/reader/DwrfData.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class DwrfData : public dwio::common::FormatData {

/// Seeks possible flat map in map streams and nulls to the row group
/// and returns a PositionsProvider for the other streams.
dwio::common::PositionProvider seekToRowGroup(uint32_t index) override;
dwio::common::PositionProvider seekToRowGroup(int64_t index) override;

int64_t stripeRows() const {
return stripeRows_;
Expand Down
8 changes: 3 additions & 5 deletions velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class SelectiveByteRleColumnReader
}
}

void seekToRowGroup(uint32_t index) override {
void seekToRowGroup(int64_t index) override {
dwio::common::SelectiveByteRleColumnReader::seekToRowGroup(index);
auto positionsProvider = formatData_->seekToRowGroup(index);
if (boolRle_) {
Expand All @@ -78,10 +78,8 @@ class SelectiveByteRleColumnReader
return numValues;
}

void read(
vector_size_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) override {
void read(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls)
override {
readCommon<SelectiveByteRleColumnReader, true>(offset, rows, incomingNulls);
readOffset_ += rows.back() + 1;
}
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ uint64_t SelectiveDecimalColumnReader<DataT>::skip(uint64_t numValues) {
}

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(uint32_t index) {
void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(int64_t index) {
SelectiveColumnReader::seekToRowGroup(index);
auto positionsProvider = formatData_->seekToRowGroup(index);
valueDecoder_->seekToRowGroup(positionsProvider);
Expand Down Expand Up @@ -108,7 +108,7 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {

template <typename DataT>
void SelectiveDecimalColumnReader<DataT>::read(
vector_size_t offset,
int64_t offset,
const RowSet& rows,
const uint64_t* incomingNulls) {
VELOX_CHECK(!scanSpec_->filter());
Expand Down
5 changes: 2 additions & 3 deletions velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ class SelectiveDecimalColumnReader : public SelectiveColumnReader {
return version_ != velox::dwrf::RleVersion_2;
}

void seekToRowGroup(uint32_t index) override;
void seekToRowGroup(int64_t index) override;
uint64_t skip(uint64_t numValues) override;

void read(vector_size_t offset, const RowSet& rows, const uint64_t* nulls)
override;
void read(int64_t offset, const RowSet& rows, const uint64_t* nulls) override;

void getValues(const RowSet& rows, VectorPtr* result) override;

Expand Down
Loading

0 comments on commit 1fc46ad

Please sign in to comment.