Skip to content

Commit

Permalink
Add TableScanReplayer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Oct 30, 2024
1 parent df5cff5 commit cbe9dee
Show file tree
Hide file tree
Showing 22 changed files with 843 additions and 28 deletions.
8 changes: 5 additions & 3 deletions velox/connectors/hive/HiveConnectorSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ folly::dynamic HiveConnectorSplit::serialize() const {
customSplitInfoObj[key] = value;
}
obj["customSplitInfo"] = customSplitInfoObj;
obj["extraFileInfo"] = *extraFileInfo;
obj["extraFileInfo"] =
extraFileInfo == nullptr ? nullptr : folly::dynamic(*extraFileInfo);

folly::dynamic serdeParametersObj = folly::dynamic::object;
for (const auto& [key, value] : serdeParameters) {
Expand Down Expand Up @@ -118,8 +119,9 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
customSplitInfo[key.asString()] = value.asString();
}

std::shared_ptr<std::string> extraFileInfo =
std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::shared_ptr<std::string> extraFileInfo = obj["extraFileInfo"].isNull()
? nullptr
: std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::unordered_map<std::string, std::string> serdeParameters;
for (const auto& [key, value] : obj["serdeParameters"].items()) {
serdeParameters[key.asString()] = value.asString();
Expand Down
26 changes: 23 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase {
ASSERT_EQ(value, clone->customSplitInfo.at(key));
}

ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
if (split.extraFileInfo != nullptr) {
ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
} else {
ASSERT_EQ(clone->extraFileInfo, nullptr);
}
ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size());
for (const auto& [key, value] : split.serdeParameters) {
ASSERT_EQ(value, clone->serdeParameters.at(key));
Expand Down Expand Up @@ -235,7 +239,7 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
FileProperties fileProperties{
.fileSize = 2048, .modificationTime = std::nullopt};
const auto properties = std::optional<FileProperties>(fileProperties);
const auto split = HiveConnectorSplit(
const auto split1 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
Expand All @@ -248,8 +252,24 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
serdeParameters,
splitWeight,
infoColumns,
properties);
testSerde(split1);

const auto split2 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
start,
length,
{},
tableBucketNumber,
customSplitInfo,
nullptr,
{},
splitWeight,
{},
std::nullopt);
testSerde(split);
testSerde(split2);
}

} // namespace
Expand Down
24 changes: 19 additions & 5 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,12 @@ void Operator::maybeSetTracer() {
const auto opTraceDirPath = trace::getOpTraceDirectory(
traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::OperatorTraceWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
traceConfig->updateAndCheckTraceLimitCB);

if (operatorType() == "TableScan") {
setupSplitTracer(opTraceDirPath);
} else {
setupInputTracer(opTraceDirPath);
}
}

void Operator::traceInput(const RowVectorPtr& input) {
Expand All @@ -159,6 +160,19 @@ Operator::translators() {
return translators;
}

void Operator::setupInputTracer(const std::string& opTraceDirPath) {
inputTracer_ = std::make_unique<trace::OperatorTraceInputWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
operatorCtx_->driverCtx()->traceConfig()->updateAndCheckTraceLimitCB);
}

void Operator::setupSplitTracer(const std::string& opTraceDirPath) {
splitTracer_ =
std::make_unique<trace::OperatorTraceSplitWriter>(opTraceDirPath);
}

// static
std::unique_ptr<Operator> Operator::fromPlanNode(
DriverCtx* ctx,
Expand Down
17 changes: 14 additions & 3 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,8 @@ class Operator : public BaseRuntimeStatWriter {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

/// Invoked to setup query data writer for this operator if the associated
/// query plan node is configured to collect trace.
/// Invoked to setup query data or split writer for this operator if the
/// associated query plan node is configured to collect trace.
void maybeSetTracer();

/// Creates output vector from 'input_' and 'results' according to
Expand Down Expand Up @@ -774,7 +774,12 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::OperatorTraceWriter> inputTracer_;

/// NOTE: only one of the two could be set for an operator for tracing .
/// 'splitTracer_' is only set for table scan to record the processed split
/// for now.
std::unique_ptr<trace::OperatorTraceInputWriter> inputTracer_{nullptr};
std::unique_ptr<trace::OperatorTraceSplitWriter> splitTracer_{nullptr};

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand All @@ -799,6 +804,12 @@ class Operator : public BaseRuntimeStatWriter {

std::unordered_map<column_index_t, std::shared_ptr<common::Filter>>
dynamicFilters_;

private:
// Setup 'inputTracer_' to record the processed input vectors.
void setupInputTracer(const std::string& traceDir);
// Setup 'splitTracer_' for table scan to record the processed split.
void setupSplitTracer(const std::string& traceDir);
};

/// Given a row type returns indices for the specified subset of columns.
Expand Down
87 changes: 85 additions & 2 deletions velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

#include <utility>

#include <folly/hash/Checksum.h>
#include "velox/common/file/FileInputStream.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/OperatorTraceReader.h"

#include "velox/exec/Trace.h"
#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {
using namespace facebook::velox::connector::hive;

namespace facebook::velox::exec::trace {
OperatorTraceInputReader::OperatorTraceInputReader(
std::string traceDir,
RowTypePtr dataType,
Expand Down Expand Up @@ -75,4 +79,83 @@ OperatorTraceSummary OperatorTraceSummaryReader::read() const {
summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt();
return summary;
}

OperatorTraceSplitReader::OperatorTraceSplitReader(
std::vector<std::string> traceDirs,
memory::MemoryPool* pool)
: traceDirs_(std::move(traceDirs)),
fs_(filesystems::getFileSystem(traceDirs_[0], nullptr)),
pool_(pool) {
VELOX_CHECK_NOT_NULL(fs_);
}

std::vector<exec::Split> OperatorTraceSplitReader::read() const {
std::vector<exec::Split> splits;
for (const auto& traceDir : traceDirs_) {
auto splitInfoStream = getSplitInputStream(traceDir);
if (splitInfoStream == nullptr) {
continue;
}
const auto splitStrs = getSplitInfos(splitInfoStream.get());
for (const auto& splitStr : splitStrs) {
folly::dynamic splitInfoObj = folly::parseJson(splitStr);
const auto split =
ISerializable::deserialize<HiveConnectorSplit>(splitInfoObj);
splits.emplace_back(std::make_shared<HiveConnectorSplit>(
split->connectorId,
split->filePath,
split->fileFormat,
split->start,
split->length,
split->partitionKeys,
split->tableBucketNumber,
split->customSplitInfo,
split->extraFileInfo,
split->serdeParameters,
split->splitWeight,
split->infoColumns,
split->properties));
}
}
return splits;
}

std::unique_ptr<common::FileInputStream>
OperatorTraceSplitReader::getSplitInputStream(
const std::string& traceDir) const {
auto splitInfoFile = fs_->openFileForRead(getOpTraceSplitFilePath(traceDir));
if (splitInfoFile->size() == 0) {
LOG(WARNING) << "Split info is empty in " << traceDir;
return nullptr;
}
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(splitInfoFile), 1 << 20, pool_);
}

// static
std::vector<std::string> OperatorTraceSplitReader::getSplitInfos(
common::FileInputStream* stream) {
std::vector<std::string> splits;
try {
while (!stream->atEnd()) {
const auto length = stream->read<uint32_t>();
std::string splitStr(length, '\0');
stream->readBytes(reinterpret_cast<uint8_t*>(splitStr.data()), length);
const auto crc32 = stream->read<uint32_t>();
const auto actualCrc32 = folly::crc32(
reinterpret_cast<const uint8_t*>(splitStr.data()), splitStr.size());
if (crc32 != actualCrc32) {
LOG(ERROR) << "Failed to verify the split checksum " << crc32
<< " which does not equal to the actual computed checksum "
<< actualCrc32;
break;
}
splits.push_back(std::move(splitStr));
}
} catch (const VeloxException& e) {
LOG(ERROR) << "Failed to deserialize split: " << e.message();
}
return splits;
}
} // namespace facebook::velox::exec::trace
32 changes: 31 additions & 1 deletion velox/exec/OperatorTraceReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

#include "velox/common/file/FileInputStream.h"
#include "velox/common/file/FileSystems.h"
#include "velox/exec/Split.h"
#include "velox/exec/Trace.h"
#include "velox/serializers/PrestoSerializer.h"

namespace facebook::velox::exec::trace {

/// Used to read an operator trace input.
class OperatorTraceInputReader {
public:
Expand Down Expand Up @@ -66,4 +66,34 @@ class OperatorTraceSummaryReader {
memory::MemoryPool* const pool_;
const std::unique_ptr<ReadFile> summaryFile_;
};

/// Used to load the input splits from a set of traced 'TableScan' operators for
/// replay.
///
/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will
/// be extended to handle more types of splits, such as
/// 'IcebergHiveConnectorSplit'.
class OperatorTraceSplitReader {
public:
/// 'traceDirs' provides a list of directories with each one containing the
/// traced split info file for one table scan operator.
explicit OperatorTraceSplitReader(
std::vector<std::string> traceDirs,
memory::MemoryPool* pool);

/// Reads from 'splitInfoStream_' and deserializes to 'splitInfos'. Returns
/// all the traced splits.
std::vector<exec::Split> read() const;

private:
static std::vector<std::string> getSplitInfos(
common::FileInputStream* stream);

std::unique_ptr<common::FileInputStream> getSplitInputStream(
const std::string& traceDir) const;

const std::vector<std::string> traceDirs_;
const std::shared_ptr<filesystems::FileSystem> fs_;
memory::MemoryPool* const pool_;
};
} // namespace facebook::velox::exec::trace
55 changes: 51 additions & 4 deletions velox/exec/OperatorTraceWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

#include "velox/exec/OperatorTraceWriter.h"

#include <folly/hash/Checksum.h>
#include <folly/io/Cursor.h>
#include <utility>

#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/Operator.h"
#include "velox/exec/Trace.h"
#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {

OperatorTraceWriter::OperatorTraceWriter(
OperatorTraceInputWriter::OperatorTraceInputWriter(
Operator* traceOp,
std::string traceDir,
memory::MemoryPool* pool,
Expand All @@ -39,7 +43,7 @@ OperatorTraceWriter::OperatorTraceWriter(
VELOX_CHECK_NOT_NULL(traceFile_);
}

void OperatorTraceWriter::write(const RowVectorPtr& rows) {
void OperatorTraceInputWriter::write(const RowVectorPtr& rows) {
if (FOLLY_UNLIKELY(finished_)) {
return;
}
Expand All @@ -66,7 +70,7 @@ void OperatorTraceWriter::write(const RowVectorPtr& rows) {
traceFile_->append(std::move(iobuf));
}

void OperatorTraceWriter::finish() {
void OperatorTraceInputWriter::finish() {
if (finished_) {
return;
}
Expand All @@ -80,7 +84,7 @@ void OperatorTraceWriter::finish() {
finished_ = true;
}

void OperatorTraceWriter::writeSummary() const {
void OperatorTraceInputWriter::writeSummary() const {
const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_);
const auto file = fs_->openFileForWrite(summaryFilePath);
folly::dynamic obj = folly::dynamic::object;
Expand All @@ -96,4 +100,47 @@ void OperatorTraceWriter::writeSummary() const {
file->close();
}

OperatorTraceSplitWriter::OperatorTraceSplitWriter(std::string traceDir)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)) {
VELOX_CHECK_NOT_NULL(fs_);
splitFile_ = fs_->openFileForWrite(getOpTraceSplitFilePath(traceDir_));
VELOX_CHECK_NOT_NULL(splitFile_);
}

void OperatorTraceSplitWriter::write(const exec::Split& split) const {
VELOX_CHECK(!split.hasGroup(), "Do not support grouped execution");
VELOX_CHECK(split.hasConnectorSplit());
const auto splitObj = split.connectorSplit->serialize();
const auto splitJson = folly::toJson(splitObj);
auto ioBuf = serialize(splitJson);
splitFile_->append(std::move(ioBuf));
}

void OperatorTraceSplitWriter::finish() {
if (finished_) {
return;
}

VELOX_CHECK_NOT_NULL(
splitFile_, "The query data writer has already been finished");
splitFile_->close();
splitFile_.reset();
finished_ = true;
}

// static
std::unique_ptr<folly::IOBuf> OperatorTraceSplitWriter::serialize(
const std::string& split) {
const uint32_t length = split.length();
const uint32_t crc32 = folly::crc32(
reinterpret_cast<const uint8_t*>(split.data()), split.size());
auto ioBuf =
folly::IOBuf::create(sizeof(length) + split.size() + sizeof(crc32));
folly::io::Appender appender(ioBuf.get(), 0);
appender.writeLE(length);
appender.push(reinterpret_cast<const uint8_t*>(split.data()), length);
appender.writeLE(crc32);
return ioBuf;
}
} // namespace facebook::velox::exec::trace
Loading

0 comments on commit cbe9dee

Please sign in to comment.