Skip to content

Commit

Permalink
Add GCS Writer Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
majetideepak committed Oct 22, 2024
1 parent 1fc46ad commit 08aebcc
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 154 deletions.
2 changes: 1 addition & 1 deletion scripts/setup-helper-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ function cmake_install {
if prompt "Do you want to rebuild ${NAME}?"; then
${SUDO} rm -rf "${BINARY_DIR}"
else
return
return 0
fi
fi

Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/storage_adapters/gcs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ velox_add_library(velox_gcs RegisterGCSFileSystem.cpp)

if(VELOX_ENABLE_GCS)
velox_sources(velox_gcs PRIVATE GCSFileSystem.cpp GCSUtil.cpp)
velox_link_libraries(velox_gcs velox_exception Folly::folly
velox_link_libraries(velox_gcs velox_dwio_common Folly::folly
google-cloud-cpp::storage)

if(${VELOX_BUILD_TESTING})
Expand Down
17 changes: 12 additions & 5 deletions velox/connectors/hive/storage_adapters/gcs/GCSUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ constexpr std::string_view kGCSScheme{"gs://"};

} // namespace

static std::string_view kLoremIpsum =
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor"
"incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis "
"nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."
"Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu"
"fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in"
"culpa qui officia deserunt mollit anim id est laborum.";

std::string getErrorStringFromGCSError(const google::cloud::StatusCode& error);

inline bool isGCSFile(const std::string_view filename) {
Expand All @@ -40,12 +48,11 @@ inline void setBucketAndKeyFromGCSPath(
bucket = path.substr(0, firstSep);
key = path.substr(firstSep + 1);
}
inline std::string gcsURI(const std::string& bucket) {
return std::string(kGCSScheme) + bucket;
}

inline std::string gcsURI(const std::string& bucket, const std::string& key) {
return gcsURI(bucket) + kSep + key;
inline std::string gcsURI(std::string_view bucket, std::string_view key) {
std::stringstream ss;
ss << kGCSScheme << bucket << kSep << key;
return ss.str();
}

inline std::string gcsPath(const std::string_view& path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h" // @manual
#include "velox/dwio/common/FileSink.h"
#endif

namespace facebook::velox::filesystems {
Expand Down Expand Up @@ -52,11 +53,28 @@ gcsFileSystemGenerator() {
};
return filesystemGenerator;
}

std::unique_ptr<velox::dwio::common::FileSink> gcsWriteFileSinkGenerator(
const std::string& fileURI,
const velox::dwio::common::FileSink::Options& options) {
if (isGCSFile(fileURI)) {
auto fileSystem =
filesystems::getFileSystem(fileURI, options.connectorProperties);
return std::make_unique<dwio::common::WriteFileSink>(
fileSystem->openFileForWrite(fileURI, {{}, options.pool, std::nullopt}),
fileURI,
options.metricLogger,
options.stats);
}
return nullptr;
}
#endif

void registerGCSFileSystem() {
#ifdef VELOX_ENABLE_GCS
registerFileSystem(isGCSFile, gcsFileSystemGenerator());
dwio::common::FileSink::registerFactory(
std::function(gcsWriteFileSinkGenerator));
#endif
}

Expand Down
15 changes: 15 additions & 0 deletions velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,25 @@ target_link_libraries(
velox_core
velox_dwio_common_exception
velox_exec
velox_exec_test_lib
velox_file
velox_gcs
velox_hive_connector
velox_temp_path
GTest::gmock
GTest::gtest
GTest::gtest_main)

add_executable(velox_gcs_insert_test GCSInsertTest.cpp)
add_test(velox_gcs_insert_test velox_gcs_insert_test)
target_link_libraries(
velox_gcs_insert_test
velox_file
velox_gcs
velox_hive_config
velox_core
velox_exec_test_lib
velox_dwio_common_exception
velox_exec
GTest::gtest
GTest::gtest_main)
174 changes: 27 additions & 147 deletions velox/connectors/hive/storage_adapters/gcs/tests/GCSFileSystemTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,125 +18,24 @@
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/connectors/hive/FileHandle.h"
#include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h"
#include "velox/connectors/hive/storage_adapters/gcs/tests/GcsTestbench.h"
#include "velox/exec/tests/utils/TempFilePath.h"

#include <boost/process.hpp>
#include <gmock/gmock-matchers.h>
#include <gmock/gmock-more-matchers.h>
#include <google/cloud/storage/client.h>
#include "gtest/gtest.h"
#include "gtest/internal/custom/gtest.h"

namespace bp = boost::process;
namespace gc = google::cloud;
namespace gcs = google::cloud::storage;
constexpr char const* kTestBenchPort{"9001"};

const std::string kLoremIpsum =
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor"
"incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis "
"nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."
"Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu"
"fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in"
"culpa qui officia deserunt mollit anim id est laborum.";

class GcsTestbench : public testing::Environment {
public:
GcsTestbench() : port_(kTestBenchPort) {
std::vector<std::string> names{"python3", "python"};
// If the build script or application developer provides a value in the
// PYTHON environment variable, then just use that.
if (const auto* env = std::getenv("PYTHON")) {
names = {env};
}
auto error = std::string(
"Coud not start GCS emulator."
" Used the following list of python interpreter names:");
for (const auto& interpreter : names) {
auto exe_path = bp::search_path(interpreter);
error += " " + interpreter;
if (exe_path.empty()) {
error += " (exe not found)";
continue;
}

server_process_ = bp::child(
boost::this_process::environment(),
exe_path,
"-m",
"testbench",
"--port",
port_,
group_);
if (server_process_.valid() && server_process_.running())
break;
error += " (failed to start)";
server_process_.terminate();
server_process_.wait();
}
if (server_process_.valid() && server_process_.valid())
return;
error_ = std::move(error);
}

~GcsTestbench() override {
// Brutal shutdown, kill the full process group because the GCS testbench
// may launch additional children.
group_.terminate();
if (server_process_.valid()) {
server_process_.wait();
}
}

const std::string& port() const {
return port_;
}

const std::string& error() const {
return error_;
}

private:
std::string port_;
bp::child server_process_;
bp::group group_;
std::string error_;
};

using namespace facebook::velox;

namespace facebook::velox::filesystems {
namespace {
class GCSFileSystemTest : public testing::Test {
protected:
static void SetUpTestSuite() {
if (testbench_ == nullptr) {
testbench_ = std::make_shared<GcsTestbench>();
testbench_->bootstrap();
}

ASSERT_THAT(testbench_, ::testing::NotNull());
ASSERT_THAT(testbench_->error(), ::testing::IsEmpty());

// Create a bucket and a small file in the testbench. This makes it easier
// to bootstrap GcsFileSystem and its tests.
auto client = gcs::Client(
google::cloud::Options{}
.set<gcs::RestEndpointOption>(
"http://localhost:" + testbench_->port())
.set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials()));

bucket_name_ = "test1-gcs";
google::cloud::StatusOr<gcs::BucketMetadata> bucket =
client.CreateBucketForProject(
bucket_name_, "ignored-by-testbench", gcs::BucketMetadata{});
ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" << bucket_name_
<< ">, status=" << bucket.status();

object_name_ = "test-object-name";
google::cloud::StatusOr<gcs::ObjectMetadata> object =
client.InsertObject(bucket_name_, object_name_, kLoremIpsum);
ASSERT_TRUE(object.ok()) << "Failed to create object <" << object_name_
<< ">, status=" << object.status();
}

std::shared_ptr<const config::ConfigBase> testGcsOptions() const {
Expand All @@ -148,36 +47,14 @@ class GCSFileSystemTest : public testing::Test {
std::move(configOverride));
}

std::string preexistingBucketName() {
return bucket_name_;
}

std::string preexistingBucketPath() {
return bucket_name_ + '/';
}

std::string preexistingObjectName() {
return object_name_;
}

std::string preexistingObjectPath() {
return preexistingBucketPath() + preexistingObjectName();
}
static void TearDownTestSuite() {}

static std::shared_ptr<GcsTestbench> testbench_;
static std::string bucket_name_;
static std::string object_name_;
};

std::shared_ptr<GcsTestbench> GCSFileSystemTest::testbench_ =
nullptr; // will be destroyed on destructor
std::string GCSFileSystemTest::bucket_name_;
std::string GCSFileSystemTest::object_name_;
std::shared_ptr<GcsTestbench> GCSFileSystemTest::testbench_ = nullptr;

TEST_F(GCSFileSystemTest, readFile) {
const std::string gcsFile =
gcsURI(preexistingBucketName(), preexistingObjectName());
const auto gcsFile = gcsURI(
testbench_->preexistingBucketName(), testbench_->preexistingObjectName());

filesystems::GCSFileSystem gcfs(testGcsOptions());
gcfs.initializeClient();
Expand Down Expand Up @@ -213,8 +90,8 @@ TEST_F(GCSFileSystemTest, readFile) {
}

TEST_F(GCSFileSystemTest, writeAndReadFile) {
const std::string newFile = "readWriteFile.txt";
const std::string gcsFile = gcsURI(preexistingBucketName(), newFile);
const std::string_view newFile = "readWriteFile.txt";
const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), newFile);

filesystems::GCSFileSystem gcfs(testGcsOptions());
gcfs.initializeClient();
Expand Down Expand Up @@ -243,19 +120,19 @@ TEST_F(GCSFileSystemTest, writeAndReadFile) {
}

TEST_F(GCSFileSystemTest, openExistingFileForWrite) {
const std::string newFile = "readWriteFile.txt";
const std::string gcsFile = gcsURI(preexistingBucketName(), newFile);
const std::string_view newFile = "readWriteFile.txt";
const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), newFile);

filesystems::GCSFileSystem gcfs(testGcsOptions());
gcfs.initializeClient();
VELOX_ASSERT_THROW(gcfs.openFileForWrite(gcsFile), "File already exists");
}

TEST_F(GCSFileSystemTest, renameNotImplemented) {
const char* file = "newTest.txt";
const std::string gcsExistingFile =
gcsURI(preexistingBucketName(), preexistingObjectName());
const std::string gcsNewFile = gcsURI(preexistingBucketName(), file);
const std::string_view file = "newTest.txt";
const auto gcsExistingFile = gcsURI(
testbench_->preexistingBucketName(), testbench_->preexistingObjectName());
const auto gcsNewFile = gcsURI(testbench_->preexistingBucketName(), file);
filesystems::GCSFileSystem gcfs(testGcsOptions());
gcfs.initializeClient();
gcfs.openFileForRead(gcsExistingFile);
Expand All @@ -265,25 +142,25 @@ TEST_F(GCSFileSystemTest, renameNotImplemented) {
}

TEST_F(GCSFileSystemTest, mkdirNotImplemented) {
const char* dir = "newDirectory";
const std::string gcsNewDirectory = gcsURI(preexistingBucketName(), dir);
const std::string_view dir = "newDirectory";
const auto gcsNewDirectory = gcsURI(testbench_->preexistingBucketName(), dir);
filesystems::GCSFileSystem gcfs(testGcsOptions());
gcfs.initializeClient();
VELOX_ASSERT_THROW(
gcfs.mkdir(gcsNewDirectory), "mkdir for GCS not implemented");
}

TEST_F(GCSFileSystemTest, rmdirNotImplemented) {
const char* dir = "Directory";
const std::string gcsDirectory = gcsURI(preexistingBucketName(), dir);
const std::string_view dir = "Directory";
const auto gcsDirectory = gcsURI(testbench_->preexistingBucketName(), dir);
filesystems::GCSFileSystem gcfs(testGcsOptions());
gcfs.initializeClient();
VELOX_ASSERT_THROW(gcfs.rmdir(gcsDirectory), "rmdir for GCS not implemented");
}

TEST_F(GCSFileSystemTest, missingFile) {
const char* file = "newTest.txt";
const std::string gcsFile = gcsURI(preexistingBucketName(), file);
const std::string_view file = "newTest.txt";
const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), file);
filesystems::GCSFileSystem gcfs(testGcsOptions());
gcfs.initializeClient();
VELOX_ASSERT_RUNTIME_THROW_CODE(
Expand All @@ -295,7 +172,7 @@ TEST_F(GCSFileSystemTest, missingFile) {
TEST_F(GCSFileSystemTest, missingBucket) {
filesystems::GCSFileSystem gcfs(testGcsOptions());
gcfs.initializeClient();
const char* gcsFile = "gs://dummy/foo.txt";
const std::string_view gcsFile = "gs://dummy/foo.txt";
VELOX_ASSERT_RUNTIME_THROW_CODE(
gcfs.openFileForRead(gcsFile),
error_code::kFileNotFound,
Expand Down Expand Up @@ -357,8 +234,9 @@ TEST_F(GCSFileSystemTest, credentialsConfig) {
filesystems::GCSFileSystem gcfs(conf);
gcfs.initializeClient();
try {
const std::string gcsFile =
gcsURI(preexistingBucketName(), preexistingObjectName());
const std::string gcsFile = gcsURI(
testbench_->preexistingBucketName(),
testbench_->preexistingObjectName());
gcfs.openFileForRead(gcsFile);
FAIL() << "Expected VeloxException";
} catch (VeloxException const& err) {
Expand All @@ -368,3 +246,5 @@ TEST_F(GCSFileSystemTest, credentialsConfig) {
err.message(), testing::HasSubstr("Invalid ServiceAccountCredentials"));
}
}
} // namespace
} // namespace facebook::velox::filesystems
Loading

0 comments on commit 08aebcc

Please sign in to comment.