Skip to content

Commit

Permalink
Add TableScan Replayer (#11205)
Browse files Browse the repository at this point in the history
Summary:
Add a split tracer to record and load the input splits from a tracing 'TableScan'
operator, and for getting the traced splits when replaying 'TableScan'.
Currently, it only works with 'HiveConnectorSplit'. In the future, it will
be extended to handle more types of splits, such as 'IcebergHiveConnectorSplit'.

part of #9668

Pull Request resolved: #11205

Reviewed By: tanjialiang

Differential Revision: D64946986

Pulled By: xiaoxmeng

fbshipit-source-id: 5f656e5ad9b8755484c3eb5ac42b209569a98101
  • Loading branch information
duanmeng authored and facebook-github-bot committed Nov 10, 2024
1 parent 62e589e commit 12b52e7
Show file tree
Hide file tree
Showing 23 changed files with 985 additions and 50 deletions.
34 changes: 26 additions & 8 deletions velox/connectors/hive/HiveConnectorSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ folly::dynamic HiveConnectorSplit::serialize() const {
customSplitInfoObj[key] = value;
}
obj["customSplitInfo"] = customSplitInfoObj;
obj["extraFileInfo"] = *extraFileInfo;
obj["extraFileInfo"] =
extraFileInfo == nullptr ? nullptr : folly::dynamic(*extraFileInfo);

folly::dynamic serdeParametersObj = folly::dynamic::object;
for (const auto& [key, value] : serdeParameters) {
Expand All @@ -84,8 +85,14 @@ folly::dynamic HiveConnectorSplit::serialize() const {
? folly::dynamic(properties->modificationTime.value())
: nullptr;
obj["properties"] = propertiesObj;
} else {
obj["properties"] = nullptr;
}

if (rowIdProperties.has_value()) {
folly::dynamic rowIdObj = folly::dynamic::object;
rowIdObj["metadataVersion"] = rowIdProperties->metadataVersion;
rowIdObj["partitionId"] = rowIdProperties->partitionId;
rowIdObj["tableGuid"] = rowIdProperties->tableGuid;
obj["rowIdProperties"] = rowIdObj;
}

return obj;
Expand Down Expand Up @@ -118,8 +125,9 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
customSplitInfo[key.asString()] = value.asString();
}

std::shared_ptr<std::string> extraFileInfo =
std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::shared_ptr<std::string> extraFileInfo = obj["extraFileInfo"].isNull()
? nullptr
: std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::unordered_map<std::string, std::string> serdeParameters;
for (const auto& [key, value] : obj["serdeParameters"].items()) {
serdeParameters[key.asString()] = value.asString();
Expand All @@ -131,8 +139,8 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
}

std::optional<FileProperties> properties = std::nullopt;
const auto propertiesObj = obj["properties"];
if (!propertiesObj.isNull()) {
const auto& propertiesObj = obj.getDefault("properties", nullptr);
if (propertiesObj != nullptr) {
properties = FileProperties{
propertiesObj["fileSize"].isNull()
? std::nullopt
Expand All @@ -142,6 +150,15 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
: std::optional(propertiesObj["modificationTime"].asInt())};
}

std::optional<RowIdProperties> rowIdProperties = std::nullopt;
const auto& rowIdObj = obj.getDefault("rowIdProperties", nullptr);
if (rowIdObj != nullptr) {
rowIdProperties = RowIdProperties{
.metadataVersion = rowIdObj["metadataVersion"].asInt(),
.partitionId = rowIdObj["partitionId"].asInt(),
.tableGuid = rowIdObj["tableGuid"].asString()};
}

return std::make_shared<HiveConnectorSplit>(
connectorId,
filePath,
Expand All @@ -155,7 +172,8 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
serdeParameters,
splitWeight,
infoColumns,
properties);
properties,
rowIdProperties);
}

// static
Expand Down
6 changes: 4 additions & 2 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
const std::unordered_map<std::string, std::string>& _serdeParameters = {},
int64_t _splitWeight = 0,
const std::unordered_map<std::string, std::string>& _infoColumns = {},
std::optional<FileProperties> _properties = std::nullopt)
std::optional<FileProperties> _properties = std::nullopt,
std::optional<RowIdProperties> _rowIdProperties = std::nullopt)
: ConnectorSplit(connectorId, _splitWeight),
filePath(_filePath),
fileFormat(_fileFormat),
Expand All @@ -96,7 +97,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
extraFileInfo(_extraFileInfo),
serdeParameters(_serdeParameters),
infoColumns(_infoColumns),
properties(_properties) {}
properties(_properties),
rowIdProperties(_rowIdProperties) {}

std::string toString() const override;

Expand Down
30 changes: 27 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase {
ASSERT_EQ(value, clone->customSplitInfo.at(key));
}

ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
if (split.extraFileInfo != nullptr) {
ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
} else {
ASSERT_EQ(clone->extraFileInfo, nullptr);
}
ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size());
for (const auto& [key, value] : split.serdeParameters) {
ASSERT_EQ(value, clone->serdeParameters.at(key));
Expand Down Expand Up @@ -235,7 +239,9 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
FileProperties fileProperties{
.fileSize = 2048, .modificationTime = std::nullopt};
const auto properties = std::optional<FileProperties>(fileProperties);
const auto split = HiveConnectorSplit(
RowIdProperties rowIdProperties{
.metadataVersion = 2, .partitionId = 3, .tableGuid = "test"};
const auto split1 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
Expand All @@ -248,8 +254,26 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
serdeParameters,
splitWeight,
infoColumns,
properties,
rowIdProperties);
testSerde(split1);

const auto split2 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
start,
length,
{},
tableBucketNumber,
customSplitInfo,
nullptr,
{},
splitWeight,
{},
std::nullopt,
std::nullopt);
testSerde(split);
testSerde(split2);
}

} // namespace
Expand Down
29 changes: 24 additions & 5 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,12 @@ void Operator::maybeSetTracer() {
const auto opTraceDirPath = trace::getOpTraceDirectory(
traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::OperatorTraceWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
traceConfig->updateAndCheckTraceLimitCB);

if (operatorType() == "TableScan") {
setupSplitTracer(opTraceDirPath);
} else {
setupInputTracer(opTraceDirPath);
}
}

void Operator::traceInput(const RowVectorPtr& input) {
Expand All @@ -152,9 +153,14 @@ void Operator::traceInput(const RowVectorPtr& input) {
}

void Operator::finishTrace() {
VELOX_CHECK(inputTracer_ == nullptr || splitTracer_ == nullptr);
if (inputTracer_ != nullptr) {
inputTracer_->finish();
}

if (splitTracer_ != nullptr) {
splitTracer_->finish();
}
}

std::vector<std::unique_ptr<Operator::PlanNodeTranslator>>&
Expand All @@ -163,6 +169,19 @@ Operator::translators() {
return translators;
}

void Operator::setupInputTracer(const std::string& opTraceDirPath) {
inputTracer_ = std::make_unique<trace::OperatorTraceInputWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
operatorCtx_->driverCtx()->traceConfig()->updateAndCheckTraceLimitCB);
}

void Operator::setupSplitTracer(const std::string& opTraceDirPath) {
splitTracer_ =
std::make_unique<trace::OperatorTraceSplitWriter>(this, opTraceDirPath);
}

// static
std::unique_ptr<Operator> Operator::fromPlanNode(
DriverCtx* ctx,
Expand Down
17 changes: 14 additions & 3 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,8 @@ class Operator : public BaseRuntimeStatWriter {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

/// Invoked to setup query data writer for this operator if the associated
/// query plan node is configured to collect trace.
/// Invoked to setup query data or split writer for this operator if the
/// associated query plan node is configured to collect trace.
void maybeSetTracer();

/// Creates output vector from 'input_' and 'results' according to
Expand Down Expand Up @@ -778,7 +778,12 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::OperatorTraceWriter> inputTracer_;

/// NOTE: only one of the two could be set for an operator for tracing .
/// 'splitTracer_' is only set for table scan to record the processed split
/// for now.
std::unique_ptr<trace::OperatorTraceInputWriter> inputTracer_{nullptr};
std::unique_ptr<trace::OperatorTraceSplitWriter> splitTracer_{nullptr};

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand All @@ -803,6 +808,12 @@ class Operator : public BaseRuntimeStatWriter {

std::unordered_map<column_index_t, std::shared_ptr<common::Filter>>
dynamicFilters_;

private:
// Setup 'inputTracer_' to record the processed input vectors.
void setupInputTracer(const std::string& traceDir);
// Setup 'splitTracer_' for table scan to record the processed split.
void setupSplitTracer(const std::string& traceDir);
};

/// Given a row type returns indices for the specified subset of columns.
Expand Down
68 changes: 66 additions & 2 deletions velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

#include <utility>

#include <folly/hash/Checksum.h>
#include "velox/common/file/FileInputStream.h"
#include "velox/exec/OperatorTraceReader.h"

#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {

OperatorTraceInputReader::OperatorTraceInputReader(
std::string traceDir,
RowTypePtr dataType,
Expand Down Expand Up @@ -84,4 +84,68 @@ OperatorTraceSummary OperatorTraceSummaryReader::read() const {
summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt();
return summary;
}

OperatorTraceSplitReader::OperatorTraceSplitReader(
std::vector<std::string> traceDirs,
memory::MemoryPool* pool)
: traceDirs_(std::move(traceDirs)),
fs_(filesystems::getFileSystem(traceDirs_[0], nullptr)),
pool_(pool) {
VELOX_CHECK_NOT_NULL(fs_);
}

std::vector<std::string> OperatorTraceSplitReader::read() const {
std::vector<std::string> splits;
for (const auto& traceDir : traceDirs_) {
auto stream = getSplitInputStream(traceDir);
if (stream == nullptr) {
continue;
}
auto curSplits = deserialize(stream.get());
splits.insert(
splits.end(),
std::make_move_iterator(curSplits.begin()),
std::make_move_iterator(curSplits.end()));
}
return splits;
}

std::unique_ptr<common::FileInputStream>
OperatorTraceSplitReader::getSplitInputStream(
const std::string& traceDir) const {
auto splitInfoFile = fs_->openFileForRead(getOpTraceSplitFilePath(traceDir));
if (splitInfoFile->size() == 0) {
LOG(WARNING) << "Split info is empty in " << traceDir;
return nullptr;
}
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(splitInfoFile), 1 << 20, pool_);
}

// static
std::vector<std::string> OperatorTraceSplitReader::deserialize(
common::FileInputStream* stream) {
std::vector<std::string> splits;
try {
while (!stream->atEnd()) {
const auto length = stream->read<uint32_t>();
std::string splitStr(length, '\0');
stream->readBytes(reinterpret_cast<uint8_t*>(splitStr.data()), length);
const auto crc32 = stream->read<uint32_t>();
const auto actualCrc32 = folly::crc32(
reinterpret_cast<const uint8_t*>(splitStr.data()), splitStr.size());
if (crc32 != actualCrc32) {
LOG(ERROR) << "Failed to verify the split checksum " << crc32
<< " which does not equal to the actual computed checksum "
<< actualCrc32;
break;
}
splits.push_back(std::move(splitStr));
}
} catch (const VeloxException& e) {
LOG(ERROR) << "Failed to deserialize split: " << e.message();
}
return splits;
}
} // namespace facebook::velox::exec::trace
30 changes: 29 additions & 1 deletion velox/exec/OperatorTraceReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

#include "velox/common/file/FileInputStream.h"
#include "velox/common/file/FileSystems.h"
#include "velox/exec/Split.h"
#include "velox/exec/Trace.h"
#include "velox/serializers/PrestoSerializer.h"

namespace facebook::velox::exec::trace {

/// Used to read an operator trace input.
class OperatorTraceInputReader {
public:
Expand Down Expand Up @@ -67,4 +67,32 @@ class OperatorTraceSummaryReader {
memory::MemoryPool* const pool_;
const std::unique_ptr<ReadFile> summaryFile_;
};

/// Used to load the input splits from a set of traced 'TableScan' operators for
/// replay.
///
/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will
/// be extended to handle more types of splits, such as
/// 'IcebergHiveConnectorSplit'.
class OperatorTraceSplitReader {
public:
/// 'traceDirs' provides a list of directories with each one containing the
/// traced split info file for one table scan operator.
explicit OperatorTraceSplitReader(
std::vector<std::string> traceDirs,
memory::MemoryPool* pool);

/// Reads and deserializes all the traced split strings.
std::vector<std::string> read() const;

private:
static std::vector<std::string> deserialize(common::FileInputStream* stream);

std::unique_ptr<common::FileInputStream> getSplitInputStream(
const std::string& traceDir) const;

const std::vector<std::string> traceDirs_;
const std::shared_ptr<filesystems::FileSystem> fs_;
memory::MemoryPool* const pool_;
};
} // namespace facebook::velox::exec::trace
Loading

0 comments on commit 12b52e7

Please sign in to comment.