Skip to content

Commit

Permalink
Add file system error injection in table writer fuzzer
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Nov 6, 2024
1 parent 7f3588e commit 4f24a84
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 17 deletions.
5 changes: 2 additions & 3 deletions velox/common/file/tests/FaultyFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
namespace facebook::velox::tests::utils {

using namespace filesystems;

/// Implements faulty filesystem for io fault injection in unit test. It is a
/// wrapper on top of a real file system, and by default it delegates the the
/// file operation to the real file system underneath.
Expand Down Expand Up @@ -55,11 +54,11 @@ class FaultyFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options) override;
const FileOptions& options = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
const FileOptions& options) override;
const FileOptions& options = {}) override;

void remove(std::string_view path) override;

Expand Down
14 changes: 12 additions & 2 deletions velox/dwio/common/FileSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,22 @@ void WriteFileSink::doClose() {

LocalFileSink::LocalFileSink(const std::string& name, const Options& options)
: FileSink{name, options}, writeFile_() {
LocalFileSink(name, options, true);
}

LocalFileSink::LocalFileSink(
const std::string& name,
const Options& options,
bool initializeWriter)
: FileSink{name, options}, writeFile_() {
const auto dir = fs::path(name_).parent_path();
if (!fs::exists(dir)) {
VELOX_CHECK(velox::common::generateFileDirectory(dir.c_str()));
}
auto fs = filesystems::getFileSystem(name_, nullptr);
writeFile_ = fs->openFileForWrite(name_);
if (initializeWriter) {
auto fs = filesystems::getFileSystem(name_, nullptr);
writeFile_ = fs->openFileForWrite(name_);
}
}

void LocalFileSink::doClose() {
Expand Down
7 changes: 6 additions & 1 deletion velox/dwio/common/FileSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ class LocalFileSink : public FileSink {
static void registerFactory();

protected:
// 'initializeWriter' is false if it is used by FaultyFileSink which setups
// the write file through the fault filesystem.
LocalFileSink(
const std::string& name,
const Options& options,
bool initializeWriter);
void doClose() override;

private:
std::unique_ptr<WriteFile> writeFile_;
};

Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/common/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# limitations under the License.

add_subdirectory(utils)
velox_add_library(velox_dwio_faulty_file_sink FaultyFileSink.cpp)
velox_link_libraries(velox_dwio_faulty_file_sink velox_file_test_utils
velox_dwio_common)

add_executable(
velox_dwio_common_test
Expand Down
51 changes: 51 additions & 0 deletions velox/dwio/common/tests/FaultyFileSink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/common/tests/FaultyFileSink.h"
#include "velox/common/base/Fs.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/file/tests/FaultyFileSystem.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/common/exception/Exception.h"

namespace facebook::velox::dwio::common {
namespace {
using tests::utils::FaultyFileSystem;

std::unique_ptr<FileSink> createFaultyFileSink(
const std::string& filePath,
const FileSink::Options& options) {
if (filePath.find("faulty:") == 0) {
return std::make_unique<FaultyFileSink>(filePath, options);
}
return nullptr;
}
} // namespace

FaultyFileSink::FaultyFileSink(
const std::string& faultyFilePath,
const Options& options)
: LocalFileSink{faultyFilePath.substr(7), options, false},
faultyFilePath_(faultyFilePath) {
auto fs = filesystems::getFileSystem(faultyFilePath_, nullptr);
writeFile_ = fs->openFileForWrite(faultyFilePath_);
}

void registerFaultyFileSinks() {
facebook::velox::dwio::common::FileSink::registerFactory(
(createFaultyFileSink));
}
} // namespace facebook::velox::dwio::common
43 changes: 43 additions & 0 deletions velox/dwio/common/tests/FaultyFileSink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <chrono>

#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/common/file/tests/FaultyFile.h"
#include "velox/common/io/IoStatistics.h"
#include "velox/dwio/common/Closeable.h"
#include "velox/dwio/common/DataBuffer.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/common/MetricsLog.h"

namespace facebook::velox::dwio::common {
using namespace facebook::velox::io;

class FaultyFileSink : public LocalFileSink {
public:
FaultyFileSink(const std::string& faultyFilePath, const Options& options);

private:
const std::string faultyFilePath_;
};

void registerFaultyFileSinks();

} // namespace facebook::velox::dwio::common
7 changes: 5 additions & 2 deletions velox/exec/fuzzer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ target_link_libraries(
velox_hive_connector
velox_dwio_dwrf_reader
velox_dwio_dwrf_writer
velox_dwio_catalog_fbhive)
velox_dwio_catalog_fbhive
velox_dwio_faulty_file_sink)

add_library(velox_aggregation_fuzzer_base AggregationFuzzerBase.cpp)

Expand Down Expand Up @@ -102,7 +103,9 @@ target_link_libraries(
velox_exec_test_lib
velox_expression_test_utility
velox_temp_path
velox_vector_test_lib)
velox_vector_test_lib
velox_dwio_faulty_file_sink
velox_file_test_utils)

add_library(velox_memory_arbitration_fuzzer MemoryArbitrationFuzzer.cpp)

Expand Down
59 changes: 50 additions & 9 deletions velox/exec/fuzzer/WriterFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/common/base/Fs.h"
#include "velox/common/encode/Base64.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/file/tests/FaultyFileSystem.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/TableHandle.h"
Expand All @@ -36,6 +37,11 @@
#include "velox/vector/VectorSaver.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"

DEFINE_bool(
file_system_error_injection,
true,
"When enabled, inject file system write error with certain possibility");

DEFINE_int32(steps, 10, "Number of plans to generate and test.");

DEFINE_int32(
Expand Down Expand Up @@ -63,6 +69,9 @@ using namespace facebook::velox::test;
namespace facebook::velox::exec::test {

namespace {
using facebook::velox::filesystems::FileSystem;
using tests::utils::FaultFileOperation;
using tests::utils::FaultyFileSystem;

class WriterFuzzer {
public:
Expand Down Expand Up @@ -123,7 +132,7 @@ class WriterFuzzer {
const std::vector<std::string>& bucketColumns,
int32_t sortColumnOffset,
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
const std::string& outputDirectoryPath);
const std::shared_ptr<TempDirectoryPath>& outputDirectoryPath);

// Generates table column handles based on table column properties
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
Expand Down Expand Up @@ -244,6 +253,12 @@ class WriterFuzzer {
std::shared_ptr<memory::MemoryPool> writerPool_{
rootPool_->addAggregateChild("writerFuzzerWriter")};
VectorFuzzer vectorFuzzer_;

const std::shared_ptr<FaultyFileSystem> faultyFs_ =
std::dynamic_pointer_cast<FaultyFileSystem>(
filesystems::getFileSystem("faulty:/tmp", {}));
std::atomic<uint64_t> injectedErrorCount_{0};
const std::string injectedErrorMsg_ = "Injected Faulty File Error";
};
} // namespace

Expand Down Expand Up @@ -292,6 +307,16 @@ void WriterFuzzer::go() {
auto startTime = std::chrono::system_clock::now();
size_t iteration = 0;

// Faulty fs will generate file system write error with certain possibility
if (FLAGS_file_system_error_injection) {
faultyFs_->setFileInjectionHook([&](FaultFileOperation* op) {
if (vectorFuzzer_.coinToss(1)) {
injectedErrorCount_++;
VELOX_FAIL(injectedErrorMsg_);
}
});
}

while (!isDone(iteration, startTime)) {
LOG(INFO) << "==============================> Started iteration "
<< iteration << " (seed: " << currentSeed_ << ")";
Expand Down Expand Up @@ -340,7 +365,9 @@ void WriterFuzzer::go() {
}
auto input = generateInputData(names, types, partitionOffset);

auto tempDirPath = exec::test::TempDirectoryPath::create();
const auto outputDirPath = exec::test::TempDirectoryPath::create(
FLAGS_file_system_error_injection);

verifyWriter(
input,
names,
Expand All @@ -351,7 +378,7 @@ void WriterFuzzer::go() {
bucketColumns,
sortColumnOffset,
sortBy,
tempDirPath->getPath());
outputDirPath);

LOG(INFO) << "==============================> Done with iteration "
<< iteration++;
Expand Down Expand Up @@ -423,11 +450,11 @@ void WriterFuzzer::verifyWriter(
const std::vector<std::string>& bucketColumns,
const int32_t sortColumnOffset,
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
const std::string& outputDirectoryPath) {
const std::shared_ptr<TempDirectoryPath>& outputDirectoryPath) {
const auto plan = PlanBuilder()
.values(input)
.tableWrite(
outputDirectoryPath,
outputDirectoryPath->getPath(),
partitionKeys,
bucketCount,
bucketColumns,
Expand All @@ -436,7 +463,18 @@ void WriterFuzzer::verifyWriter(

const auto maxDrivers =
boost::random::uniform_int_distribution<int32_t>(1, 16)(rng_);
const auto result = veloxToPrestoResult(execute(plan, maxDrivers));
RowVectorPtr result;
const uint64_t prevInjectedErrorCount = injectedErrorCount_;
try {
result = veloxToPrestoResult(execute(plan, maxDrivers));
} catch (VeloxRuntimeError& error) {
if (injectedErrorCount_ > prevInjectedErrorCount) {
VELOX_CHECK(
error.message() == injectedErrorMsg_,
"write plan failed with different error code");
return;
}
}

const auto dropSql = "DROP TABLE IF EXISTS tmp_write";
const auto sql = referenceQueryRunner_->toSql(plan).value();
Expand Down Expand Up @@ -465,11 +503,13 @@ void WriterFuzzer::verifyWriter(
const auto referencedOutputDirectoryPath =
getReferenceOutputDirectoryPath(partitionKeys.size());
comparePartitionAndBucket(
outputDirectoryPath, referencedOutputDirectoryPath, bucketCount);
outputDirectoryPath->getDelegatePath(),
referencedOutputDirectoryPath,
bucketCount);
}

// 3. Verifies data itself.
auto splits = makeSplits(outputDirectoryPath);
auto splits = makeSplits(outputDirectoryPath->getDelegatePath());
auto columnHandles =
getTableColumnHandles(names, types, partitionOffset, bucketCount);
const auto rowType = generateOutputType(names, types, bucketCount);
Expand Down Expand Up @@ -502,7 +542,8 @@ void WriterFuzzer::verifyWriter(
types.begin() + sortColumnOffset,
types.begin() + sortColumnOffset + sortBy.size()};

// Read from each file and check if data is sorted as presto sorted result.
// Read from each file and check if data is sorted as presto sorted
// result.
for (const auto& split : splits) {
auto splitReadPlan = PlanBuilder()
.tableScan(generateOutputType(
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/fuzzer/WriterFuzzerRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
#include <vector>

#include "velox/common/file/FileSystems.h"
#include "velox/common/file/tests/FaultyFileSystem.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/common/tests/FaultyFileSink.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
#include "velox/dwio/dwrf/RegisterDwrfWriter.h"
#include "velox/exec/fuzzer/FuzzerUtil.h"
Expand Down Expand Up @@ -74,6 +76,7 @@ class WriterFuzzerRunner {
size_t seed,
std::unique_ptr<ReferenceQueryRunner> referenceQueryRunner) {
filesystems::registerLocalFileSystem();
tests::utils::registerFaultyFileSystem();
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>());
auto hiveConnector =
Expand All @@ -87,6 +90,7 @@ class WriterFuzzerRunner {
dwrf::registerDwrfReaderFactory();
dwrf::registerDwrfWriterFactory();
dwio::common::registerFileSinks();
dwio::common::registerFaultyFileSinks();
facebook::velox::exec::test::writerFuzzer(
seed, std::move(referenceQueryRunner));
// Calling gtest here so that it can be recognized as tests in CI systems.
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/tests/utils/TempDirectoryPath.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class TempDirectoryPath {
return path_;
}

const std::string& getDelegatePath() const {
return tempPath_;
}

private:
static std::string createTempDirectory();

Expand Down

0 comments on commit 4f24a84

Please sign in to comment.