Skip to content

Commit

Permalink
Add QuerySplitTracer to records and rebuild splits
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Oct 13, 2024
1 parent b00751e commit 1e45029
Show file tree
Hide file tree
Showing 11 changed files with 415 additions and 14 deletions.
8 changes: 5 additions & 3 deletions velox/connectors/hive/HiveConnectorSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ folly::dynamic HiveConnectorSplit::serialize() const {
customSplitInfoObj[key] = value;
}
obj["customSplitInfo"] = customSplitInfoObj;
obj["extraFileInfo"] = *extraFileInfo;
obj["extraFileInfo"] =
extraFileInfo == nullptr ? nullptr : folly::dynamic(*extraFileInfo);

folly::dynamic serdeParametersObj = folly::dynamic::object;
for (const auto& [key, value] : serdeParameters) {
Expand Down Expand Up @@ -118,8 +119,9 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
customSplitInfo[key.asString()] = value.asString();
}

std::shared_ptr<std::string> extraFileInfo =
std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::shared_ptr<std::string> extraFileInfo = obj["extraFileInfo"].isNull()
? nullptr
: std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::unordered_map<std::string, std::string> serdeParameters;
for (const auto& [key, value] : obj["serdeParameters"].items()) {
serdeParameters[key.asString()] = value.asString();
Expand Down
26 changes: 23 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase {
ASSERT_EQ(value, clone->customSplitInfo.at(key));
}

ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
if (split.extraFileInfo != nullptr) {
ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
} else {
ASSERT_EQ(clone->extraFileInfo, nullptr);
}
ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size());
for (const auto& [key, value] : split.serdeParameters) {
ASSERT_EQ(value, clone->serdeParameters.at(key));
Expand Down Expand Up @@ -216,7 +220,7 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
FileProperties fileProperties{
.fileSize = 2048, .modificationTime = std::nullopt};
const auto properties = std::optional<FileProperties>(fileProperties);
const auto split = HiveConnectorSplit(
const auto split1 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
Expand All @@ -229,8 +233,24 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
serdeParameters,
splitWeight,
infoColumns,
properties);
testSerde(split1);

const auto split2 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
start,
length,
{},
tableBucketNumber,
customSplitInfo,
nullptr,
{},
splitWeight,
{},
std::nullopt);
testSerde(split);
testSerde(split2);
}

} // namespace
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ velox_add_library(
QueryDataWriter.cpp
QueryMetadataReader.cpp
QueryMetadataWriter.cpp
QuerySplitReader.cpp
QuerySplitWriter.cpp
QueryTraceConfig.cpp
QueryTraceScan.cpp
QueryTraceUtil.cpp
Expand Down
11 changes: 6 additions & 5 deletions velox/exec/QueryDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@
namespace facebook::velox::exec::trace {

QueryDataWriter::QueryDataWriter(
std::string path,
std::string traceDir,
memory::MemoryPool* pool,
UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB)
: dirPath_(std::move(path)),
fs_(filesystems::getFileSystem(dirPath_, nullptr)),
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)),
pool_(pool),
updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) {
VELOX_CHECK_NOT_NULL(fs_);
dataFile_ = fs_->openFileForWrite(
fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName));
fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataFileName));
VELOX_CHECK_NOT_NULL(dataFile_);
}

Expand Down Expand Up @@ -83,7 +84,7 @@ void QueryDataWriter::finish(bool limitExceeded) {

void QueryDataWriter::writeSummary(bool limitExceeded) const {
const auto summaryFilePath =
fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName);
fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataSummaryFileName);
const auto file = fs_->openFileForWrite(summaryFilePath);
folly::dynamic obj = folly::dynamic::object;
obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize();
Expand Down
9 changes: 6 additions & 3 deletions velox/exec/QueryDataWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ namespace facebook::velox::exec::trace {
class QueryDataWriter {
public:
explicit QueryDataWriter(
std::string path,
std::string traceDir,
memory::MemoryPool* pool,
UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB);

/// Serializes rows and writes out each batch.
/// Serializes and writes out each batch, enabling us to replay the execution
/// with the same batch numbers and order. Each serialized batch is flushed
/// immediately, ensuring that the traced operator can be replayed even if a
/// crash occurs during execution.
void write(const RowVectorPtr& rows);

/// Closes the data file and writes out the data summary.
Expand All @@ -49,7 +52,7 @@ class QueryDataWriter {
// TODO: add more summaries such as number of rows etc.
void writeSummary(bool limitExceeded = false) const;

const std::string dirPath_;
const std::string traceDir_;
// TODO: make 'useLosslessTimestamp' configuerable.
const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = {
true,
Expand Down
104 changes: 104 additions & 0 deletions velox/exec/QuerySplitReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/QuerySplitReader.h"

#include <folly/hash/Checksum.h>

#include "velox/common/file/FileInputStream.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/QueryTraceTraits.h"
#include "velox/exec/QueryTraceUtil.h"

using namespace facebook::velox::connector::hive;

namespace facebook::velox::exec::trace {

QuerySplitReader::QuerySplitReader(
std::string traceDir,
memory::MemoryPool* pool)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)),
pool_(pool),
splitInfoStream_(getSplitInputStream()) {
VELOX_CHECK_NOT_NULL(fs_);
VELOX_CHECK_NOT_NULL(splitInfoStream_);
}

std::vector<exec::Split> QuerySplitReader::read() const {
const auto splitStrings = getSplitInfos(splitInfoStream_.get());
std::vector<exec::Split> splits;
for (const auto& splitString : splitStrings) {
folly::dynamic splitInfoObj = folly::parseJson(splitString);
const auto split =
ISerializable::deserialize<HiveConnectorSplit>(splitInfoObj);
splits.emplace_back(
std::make_shared<HiveConnectorSplit>(
split->connectorId,
split->filePath,
split->fileFormat,
split->start,
split->length,
split->partitionKeys,
split->tableBucketNumber,
split->customSplitInfo,
split->extraFileInfo,
split->serdeParameters,
split->splitWeight,
split->infoColumns,
split->properties),
-1);
}
return splits;
}

std::unique_ptr<common::FileInputStream> QuerySplitReader::getSplitInputStream()
const {
auto splitInfoFile = fs_->openFileForRead(
fmt::format("{}/{}", traceDir_, QueryTraceTraits::kSplitInfoFileName));
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(splitInfoFile), 1 << 20, pool_);
}

// static
std::vector<std::string> QuerySplitReader::getSplitInfos(
common::FileInputStream* stream) {
std::vector<std::string> splits;
try {
while (!stream->atEnd()) {
const auto length = stream->read<uint32_t>();
std::string splitInfoString(length, '\0');
stream->readBytes(
reinterpret_cast<uint8_t*>(splitInfoString.data()), length);
const auto crc32 = stream->read<uint32_t>();
const auto actualCrc32 = folly::crc32(
reinterpret_cast<const uint8_t*>(splitInfoString.data()),
splitInfoString.size());
if (crc32 != actualCrc32) {
LOG(ERROR) << "Fails to verify the checksum " << crc32
<< " does not equal to the actual checksum " << actualCrc32;
break;
}
splits.push_back(std::move(splitInfoString));
}
} catch (const VeloxException& e) {
LOG(ERROR) << "Fails to deserialize split string from the stream for "
<< e.message();
}
return splits;
}
} // namespace facebook::velox::exec::trace
50 changes: 50 additions & 0 deletions velox/exec/QuerySplitReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <re2/re2.h>
#include "velox/common/file/FileInputStream.h"
#include "velox/common/file/FileSystems.h"
#include "velox/exec/Split.h"

namespace facebook::velox::exec::trace {
/// Used to load the input splits from a tracing 'TableScan'
/// operator, and for getting the traced splits when relaying 'TableScan'.
///
/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will
/// be extended to handle more types of splits, such as
/// 'IcebergHiveConnectorSplit'.
class QuerySplitReader {
public:
explicit QuerySplitReader(std::string traceDir, memory::MemoryPool* pool);

/// Reads from 'splitInfoStream_' and deserializes to 'splitInfos'. Returns
/// all the correctly traced splits.
std::vector<exec::Split> read() const;

private:
static std::vector<std::string> getSplitInfos(
common::FileInputStream* stream);

std::unique_ptr<common::FileInputStream> getSplitInputStream() const;

const std::string traceDir_;
const std::shared_ptr<filesystems::FileSystem> fs_;
memory::MemoryPool* const pool_;
const std::unique_ptr<common::FileInputStream> splitInfoStream_;
};
} // namespace facebook::velox::exec::trace
79 changes: 79 additions & 0 deletions velox/exec/QuerySplitWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/QuerySplitWriter.h"

#include <folly/hash/Checksum.h>
#include <folly/io/Cursor.h>

#include "QueryTraceUtil.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/QueryTraceTraits.h"

using namespace facebook::velox::connector::hive;

namespace facebook::velox::exec::trace {
/// Used to record and load the input splits from a tracing 'TableScan'
/// operator, and for getting the traced splits when relaying 'TableScan'.
///
/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will
/// be extended to handle more types of splits, such as
/// 'IcebergHiveConnectorSplit'.
QuerySplitWriter::QuerySplitWriter(std::string traceDir)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)) {
VELOX_CHECK_NOT_NULL(fs_);
splitInfoFile_ = fs_->openFileForWrite(
fmt::format("{}/{}", traceDir_, QueryTraceTraits::kSplitInfoFileName));
VELOX_CHECK_NOT_NULL(splitInfoFile_);
}

void QuerySplitWriter::write(const exec::Split& split) const {
VELOX_CHECK(!split.hasGroup(), "Do not support grouped execution");
VELOX_CHECK(split.hasConnectorSplit());
const auto splitObj = split.connectorSplit->serialize();
const auto splitJson = folly::toJson(splitObj);
auto ioBuf = appendToBuffer(splitJson);
splitInfoFile_->append(std::move(ioBuf));
}

void QuerySplitWriter::finish() {
if (finished_) {
return;
}

VELOX_CHECK_NOT_NULL(
splitInfoFile_, "The query data writer has already been finished");
splitInfoFile_->close();
splitInfoFile_.reset();
finished_ = true;
}

// static
std::unique_ptr<folly::IOBuf> QuerySplitWriter::appendToBuffer(
const std::string& split) {
const uint32_t length = split.length();
const uint32_t crc32 = folly::crc32(
reinterpret_cast<const uint8_t*>(split.data()), split.size());
auto ioBuf =
folly::IOBuf::create(sizeof(length) + split.size() + sizeof(crc32));
folly::io::Appender appender(ioBuf.get(), 0);
appender.writeLE(length);
appender.push(reinterpret_cast<const uint8_t*>(split.data()), length);
appender.writeLE(crc32);
return ioBuf;
}
} // namespace facebook::velox::exec::trace
Loading

0 comments on commit 1e45029

Please sign in to comment.