From 08aebccc35642d127b5d23c6de08ca15ad9ffc31 Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Tue, 22 Oct 2024 18:21:53 -0400 Subject: [PATCH] Add GCS Writer Sink --- scripts/setup-helper-functions.sh | 2 +- .../hive/storage_adapters/gcs/CMakeLists.txt | 2 +- .../hive/storage_adapters/gcs/GCSUtil.h | 17 +- .../gcs/RegisterGCSFileSystem.cpp | 18 ++ .../storage_adapters/gcs/tests/CMakeLists.txt | 15 ++ .../gcs/tests/GCSFileSystemTest.cpp | 174 +++--------------- .../gcs/tests/GCSInsertTest.cpp | 149 +++++++++++++++ .../storage_adapters/gcs/tests/GcsTestbench.h | 133 +++++++++++++ 8 files changed, 356 insertions(+), 154 deletions(-) create mode 100644 velox/connectors/hive/storage_adapters/gcs/tests/GCSInsertTest.cpp create mode 100644 velox/connectors/hive/storage_adapters/gcs/tests/GcsTestbench.h diff --git a/scripts/setup-helper-functions.sh b/scripts/setup-helper-functions.sh index 4c332bb305f42..43010223a5ff4 100755 --- a/scripts/setup-helper-functions.sh +++ b/scripts/setup-helper-functions.sh @@ -194,7 +194,7 @@ function cmake_install { if prompt "Do you want to rebuild ${NAME}?"; then ${SUDO} rm -rf "${BINARY_DIR}" else - return + return 0 fi fi diff --git a/velox/connectors/hive/storage_adapters/gcs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/gcs/CMakeLists.txt index c5ac37c73fd52..57ee434d50f43 100644 --- a/velox/connectors/hive/storage_adapters/gcs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/gcs/CMakeLists.txt @@ -18,7 +18,7 @@ velox_add_library(velox_gcs RegisterGCSFileSystem.cpp) if(VELOX_ENABLE_GCS) velox_sources(velox_gcs PRIVATE GCSFileSystem.cpp GCSUtil.cpp) - velox_link_libraries(velox_gcs velox_exception Folly::folly + velox_link_libraries(velox_gcs velox_dwio_common Folly::folly google-cloud-cpp::storage) if(${VELOX_BUILD_TESTING}) diff --git a/velox/connectors/hive/storage_adapters/gcs/GCSUtil.h b/velox/connectors/hive/storage_adapters/gcs/GCSUtil.h index 3c545f9d52fb0..f4aedc0815336 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GCSUtil.h +++ b/velox/connectors/hive/storage_adapters/gcs/GCSUtil.h @@ -26,6 +26,14 @@ constexpr std::string_view kGCSScheme{"gs://"}; } // namespace +static std::string_view kLoremIpsum = + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor" + "incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis " + "nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat." + "Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu" + "fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in" + "culpa qui officia deserunt mollit anim id est laborum."; + std::string getErrorStringFromGCSError(const google::cloud::StatusCode& error); inline bool isGCSFile(const std::string_view filename) { @@ -40,12 +48,11 @@ inline void setBucketAndKeyFromGCSPath( bucket = path.substr(0, firstSep); key = path.substr(firstSep + 1); } -inline std::string gcsURI(const std::string& bucket) { - return std::string(kGCSScheme) + bucket; -} -inline std::string gcsURI(const std::string& bucket, const std::string& key) { - return gcsURI(bucket) + kSep + key; +inline std::string gcsURI(std::string_view bucket, std::string_view key) { + std::stringstream ss; + ss << kGCSScheme << bucket << kSep << key; + return ss.str(); } inline std::string gcsPath(const std::string_view& path) { diff --git a/velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.cpp b/velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.cpp index 3474c8d4dfb9b..cb70a1a6087c5 100644 --- a/velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.cpp @@ -18,6 +18,7 @@ #include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h" // @manual +#include "velox/dwio/common/FileSink.h" #endif namespace facebook::velox::filesystems { @@ -52,11 +53,28 @@ gcsFileSystemGenerator() { }; return filesystemGenerator; } + +std::unique_ptr gcsWriteFileSinkGenerator( + const std::string& fileURI, + const velox::dwio::common::FileSink::Options& options) { + if (isGCSFile(fileURI)) { + auto fileSystem = + filesystems::getFileSystem(fileURI, options.connectorProperties); + return std::make_unique( + fileSystem->openFileForWrite(fileURI, {{}, options.pool, std::nullopt}), + fileURI, + options.metricLogger, + options.stats); + } + return nullptr; +} #endif void registerGCSFileSystem() { #ifdef VELOX_ENABLE_GCS registerFileSystem(isGCSFile, gcsFileSystemGenerator()); + dwio::common::FileSink::registerFactory( + std::function(gcsWriteFileSinkGenerator)); #endif } diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt index 1b43a54c4efec..e88c62ed589f9 100644 --- a/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/gcs/tests/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries( velox_core velox_dwio_common_exception velox_exec + velox_exec_test_lib velox_file velox_gcs velox_hive_connector @@ -26,3 +27,17 @@ target_link_libraries( GTest::gmock GTest::gtest GTest::gtest_main) + +add_executable(velox_gcs_insert_test GCSInsertTest.cpp) +add_test(velox_gcs_insert_test velox_gcs_insert_test) +target_link_libraries( + velox_gcs_insert_test + velox_file + velox_gcs + velox_hive_config + velox_core + velox_exec_test_lib + velox_dwio_common_exception + velox_exec + GTest::gtest + GTest::gtest_main) diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/GCSFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/gcs/tests/GCSFileSystemTest.cpp index 69832a1b18961..4dad9f1ee9346 100644 --- a/velox/connectors/hive/storage_adapters/gcs/tests/GCSFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/tests/GCSFileSystemTest.cpp @@ -18,125 +18,24 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/config/Config.h" #include "velox/common/file/File.h" -#include "velox/connectors/hive/FileHandle.h" #include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h" +#include "velox/connectors/hive/storage_adapters/gcs/tests/GcsTestbench.h" #include "velox/exec/tests/utils/TempFilePath.h" -#include -#include -#include -#include #include "gtest/gtest.h" -#include "gtest/internal/custom/gtest.h" -namespace bp = boost::process; namespace gc = google::cloud; namespace gcs = google::cloud::storage; -constexpr char const* kTestBenchPort{"9001"}; - -const std::string kLoremIpsum = - "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor" - "incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis " - "nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat." - "Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu" - "fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in" - "culpa qui officia deserunt mollit anim id est laborum."; - -class GcsTestbench : public testing::Environment { - public: - GcsTestbench() : port_(kTestBenchPort) { - std::vector names{"python3", "python"}; - // If the build script or application developer provides a value in the - // PYTHON environment variable, then just use that. - if (const auto* env = std::getenv("PYTHON")) { - names = {env}; - } - auto error = std::string( - "Coud not start GCS emulator." - " Used the following list of python interpreter names:"); - for (const auto& interpreter : names) { - auto exe_path = bp::search_path(interpreter); - error += " " + interpreter; - if (exe_path.empty()) { - error += " (exe not found)"; - continue; - } - - server_process_ = bp::child( - boost::this_process::environment(), - exe_path, - "-m", - "testbench", - "--port", - port_, - group_); - if (server_process_.valid() && server_process_.running()) - break; - error += " (failed to start)"; - server_process_.terminate(); - server_process_.wait(); - } - if (server_process_.valid() && server_process_.valid()) - return; - error_ = std::move(error); - } - - ~GcsTestbench() override { - // Brutal shutdown, kill the full process group because the GCS testbench - // may launch additional children. - group_.terminate(); - if (server_process_.valid()) { - server_process_.wait(); - } - } - - const std::string& port() const { - return port_; - } - - const std::string& error() const { - return error_; - } - - private: - std::string port_; - bp::child server_process_; - bp::group group_; - std::string error_; -}; - -using namespace facebook::velox; +namespace facebook::velox::filesystems { +namespace { class GCSFileSystemTest : public testing::Test { protected: static void SetUpTestSuite() { if (testbench_ == nullptr) { testbench_ = std::make_shared(); + testbench_->bootstrap(); } - - ASSERT_THAT(testbench_, ::testing::NotNull()); - ASSERT_THAT(testbench_->error(), ::testing::IsEmpty()); - - // Create a bucket and a small file in the testbench. This makes it easier - // to bootstrap GcsFileSystem and its tests. - auto client = gcs::Client( - google::cloud::Options{} - .set( - "http://localhost:" + testbench_->port()) - .set(gc::MakeInsecureCredentials())); - - bucket_name_ = "test1-gcs"; - google::cloud::StatusOr bucket = - client.CreateBucketForProject( - bucket_name_, "ignored-by-testbench", gcs::BucketMetadata{}); - ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" << bucket_name_ - << ">, status=" << bucket.status(); - - object_name_ = "test-object-name"; - google::cloud::StatusOr object = - client.InsertObject(bucket_name_, object_name_, kLoremIpsum); - ASSERT_TRUE(object.ok()) << "Failed to create object <" << object_name_ - << ">, status=" << object.status(); } std::shared_ptr testGcsOptions() const { @@ -148,36 +47,14 @@ class GCSFileSystemTest : public testing::Test { std::move(configOverride)); } - std::string preexistingBucketName() { - return bucket_name_; - } - - std::string preexistingBucketPath() { - return bucket_name_ + '/'; - } - - std::string preexistingObjectName() { - return object_name_; - } - - std::string preexistingObjectPath() { - return preexistingBucketPath() + preexistingObjectName(); - } - static void TearDownTestSuite() {} - static std::shared_ptr testbench_; - static std::string bucket_name_; - static std::string object_name_; }; -std::shared_ptr GCSFileSystemTest::testbench_ = - nullptr; // will be destroyed on destructor -std::string GCSFileSystemTest::bucket_name_; -std::string GCSFileSystemTest::object_name_; +std::shared_ptr GCSFileSystemTest::testbench_ = nullptr; TEST_F(GCSFileSystemTest, readFile) { - const std::string gcsFile = - gcsURI(preexistingBucketName(), preexistingObjectName()); + const auto gcsFile = gcsURI( + testbench_->preexistingBucketName(), testbench_->preexistingObjectName()); filesystems::GCSFileSystem gcfs(testGcsOptions()); gcfs.initializeClient(); @@ -213,8 +90,8 @@ TEST_F(GCSFileSystemTest, readFile) { } TEST_F(GCSFileSystemTest, writeAndReadFile) { - const std::string newFile = "readWriteFile.txt"; - const std::string gcsFile = gcsURI(preexistingBucketName(), newFile); + const std::string_view newFile = "readWriteFile.txt"; + const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), newFile); filesystems::GCSFileSystem gcfs(testGcsOptions()); gcfs.initializeClient(); @@ -243,8 +120,8 @@ TEST_F(GCSFileSystemTest, writeAndReadFile) { } TEST_F(GCSFileSystemTest, openExistingFileForWrite) { - const std::string newFile = "readWriteFile.txt"; - const std::string gcsFile = gcsURI(preexistingBucketName(), newFile); + const std::string_view newFile = "readWriteFile.txt"; + const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), newFile); filesystems::GCSFileSystem gcfs(testGcsOptions()); gcfs.initializeClient(); @@ -252,10 +129,10 @@ TEST_F(GCSFileSystemTest, openExistingFileForWrite) { } TEST_F(GCSFileSystemTest, renameNotImplemented) { - const char* file = "newTest.txt"; - const std::string gcsExistingFile = - gcsURI(preexistingBucketName(), preexistingObjectName()); - const std::string gcsNewFile = gcsURI(preexistingBucketName(), file); + const std::string_view file = "newTest.txt"; + const auto gcsExistingFile = gcsURI( + testbench_->preexistingBucketName(), testbench_->preexistingObjectName()); + const auto gcsNewFile = gcsURI(testbench_->preexistingBucketName(), file); filesystems::GCSFileSystem gcfs(testGcsOptions()); gcfs.initializeClient(); gcfs.openFileForRead(gcsExistingFile); @@ -265,8 +142,8 @@ TEST_F(GCSFileSystemTest, renameNotImplemented) { } TEST_F(GCSFileSystemTest, mkdirNotImplemented) { - const char* dir = "newDirectory"; - const std::string gcsNewDirectory = gcsURI(preexistingBucketName(), dir); + const std::string_view dir = "newDirectory"; + const auto gcsNewDirectory = gcsURI(testbench_->preexistingBucketName(), dir); filesystems::GCSFileSystem gcfs(testGcsOptions()); gcfs.initializeClient(); VELOX_ASSERT_THROW( @@ -274,16 +151,16 @@ TEST_F(GCSFileSystemTest, mkdirNotImplemented) { } TEST_F(GCSFileSystemTest, rmdirNotImplemented) { - const char* dir = "Directory"; - const std::string gcsDirectory = gcsURI(preexistingBucketName(), dir); + const std::string_view dir = "Directory"; + const auto gcsDirectory = gcsURI(testbench_->preexistingBucketName(), dir); filesystems::GCSFileSystem gcfs(testGcsOptions()); gcfs.initializeClient(); VELOX_ASSERT_THROW(gcfs.rmdir(gcsDirectory), "rmdir for GCS not implemented"); } TEST_F(GCSFileSystemTest, missingFile) { - const char* file = "newTest.txt"; - const std::string gcsFile = gcsURI(preexistingBucketName(), file); + const std::string_view file = "newTest.txt"; + const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), file); filesystems::GCSFileSystem gcfs(testGcsOptions()); gcfs.initializeClient(); VELOX_ASSERT_RUNTIME_THROW_CODE( @@ -295,7 +172,7 @@ TEST_F(GCSFileSystemTest, missingFile) { TEST_F(GCSFileSystemTest, missingBucket) { filesystems::GCSFileSystem gcfs(testGcsOptions()); gcfs.initializeClient(); - const char* gcsFile = "gs://dummy/foo.txt"; + const std::string_view gcsFile = "gs://dummy/foo.txt"; VELOX_ASSERT_RUNTIME_THROW_CODE( gcfs.openFileForRead(gcsFile), error_code::kFileNotFound, @@ -357,8 +234,9 @@ TEST_F(GCSFileSystemTest, credentialsConfig) { filesystems::GCSFileSystem gcfs(conf); gcfs.initializeClient(); try { - const std::string gcsFile = - gcsURI(preexistingBucketName(), preexistingObjectName()); + const std::string gcsFile = gcsURI( + testbench_->preexistingBucketName(), + testbench_->preexistingObjectName()); gcfs.openFileForRead(gcsFile); FAIL() << "Expected VeloxException"; } catch (VeloxException const& err) { @@ -368,3 +246,5 @@ TEST_F(GCSFileSystemTest, credentialsConfig) { err.message(), testing::HasSubstr("Invalid ServiceAccountCredentials")); } } +} // namespace +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/GCSInsertTest.cpp b/velox/connectors/hive/storage_adapters/gcs/tests/GCSInsertTest.cpp new file mode 100644 index 0000000000000..9f3a7b431ca75 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/gcs/tests/GCSInsertTest.cpp @@ -0,0 +1,149 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "velox/common/memory/Memory.h" +#include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h" +#include "velox/connectors/hive/storage_adapters/gcs/tests/GcsTestbench.h" +#include "velox/dwio/parquet/RegisterParquetReader.h" +#include "velox/dwio/parquet/RegisterParquetWriter.h" +#include "velox/exec/TableWriter.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +namespace bp = boost::process; +namespace gc = google::cloud; +namespace gcs = google::cloud::storage; + +using namespace facebook::velox::exec::test; + +namespace facebook::velox::filesystems { +namespace { + +class GCSInsertTest : public testing::Test, public test::VectorTestBase { + protected: + static void SetUpTestSuite() { + memory::MemoryManager::testingSetInstance({}); + if (testbench_ == nullptr) { + testbench_ = std::make_shared(); + testbench_->bootstrap(); + } + } + + void SetUp() override { + connector::registerConnectorFactory( + std::make_shared()); + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector( + exec::test::kHiveConnectorId, gcsOptions(), ioExecutor_.get()); + connector::registerConnector(hiveConnector); + parquet::registerParquetReaderFactory(); + parquet::registerParquetWriterFactory(); + ioExecutor_ = std::make_unique(3); + } + + void TearDown() override { + parquet::unregisterParquetReaderFactory(); + parquet::unregisterParquetWriterFactory(); + connector::unregisterConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName); + connector::unregisterConnector(exec::test::kHiveConnectorId); + } + + std::shared_ptr gcsOptions() const { + static std::unordered_map configOverride = {}; + + configOverride["hive.gcs.scheme"] = "http"; + configOverride["hive.gcs.endpoint"] = "localhost:" + testbench_->port(); + return std::make_shared( + std::move(configOverride)); + } + + static std::shared_ptr testbench_; + std::unique_ptr ioExecutor_; +}; + +std::shared_ptr GCSInsertTest::testbench_ = nullptr; +} // namespace + +TEST_F(GCSInsertTest, gcsInsertTest) { + const int64_t kExpectedRows = 1'000; + const std::string_view newFile = "insertFile.txt"; + const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), newFile); + + auto rowType = ROW( + {"c0", "c1", "c2", "c3"}, {BIGINT(), INTEGER(), SMALLINT(), DOUBLE()}); + + auto input = makeRowVector( + {makeFlatVector(kExpectedRows, [](auto row) { return row; }), + makeFlatVector(kExpectedRows, [](auto row) { return row; }), + makeFlatVector(kExpectedRows, [](auto row) { return row; }), + makeFlatVector(kExpectedRows, [](auto row) { return row; })}); + + // Insert into GCS with one writer. + auto plan = PlanBuilder() + .values({input}) + .tableWrite(gcsFile.data(), dwio::common::FileFormat::PARQUET) + .planNode(); + + // Execute the write plan. + auto results = AssertQueryBuilder(plan).copyResults(pool()); + + // First column has number of rows written in the first row and nulls in other + // rows. + auto rowCount = results->childAt(exec::TableWriteTraits::kRowCountChannel) + ->as>(); + ASSERT_FALSE(rowCount->isNullAt(0)); + ASSERT_EQ(kExpectedRows, rowCount->valueAt(0)); + ASSERT_TRUE(rowCount->isNullAt(1)); + + // Second column contains details about written files. + auto details = results->childAt(exec::TableWriteTraits::kFragmentChannel) + ->as>(); + ASSERT_TRUE(details->isNullAt(0)); + ASSERT_FALSE(details->isNullAt(1)); + folly::dynamic obj = folly::parseJson(details->valueAt(1)); + + ASSERT_EQ(kExpectedRows, obj["rowCount"].asInt()); + auto fileWriteInfos = obj["fileWriteInfos"]; + ASSERT_EQ(1, fileWriteInfos.size()); + + auto writeFileName = fileWriteInfos[0]["writeFileName"].asString(); + + // Read from 'writeFileName' and verify the data matches the original. + plan = PlanBuilder().tableScan(rowType).planNode(); + + auto filePath = fmt::format("{}{}", gcsFile, writeFileName); + const int64_t fileSize = fileWriteInfos[0]["fileSize"].asInt(); + auto split = HiveConnectorSplitBuilder(filePath) + .fileFormat(dwio::common::FileFormat::PARQUET) + .length(fileSize) + .build(); + auto copy = AssertQueryBuilder(plan).split(split).copyResults(pool()); + assertEqualResults({input}, {copy}); +} +} // namespace facebook::velox::filesystems + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::Init init{&argc, &argv, false}; + return RUN_ALL_TESTS(); +} diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/GcsTestbench.h b/velox/connectors/hive/storage_adapters/gcs/tests/GcsTestbench.h new file mode 100644 index 0000000000000..38e5bd561bf90 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/gcs/tests/GcsTestbench.h @@ -0,0 +1,133 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include "gtest/gtest.h" + +#include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h" +#include "velox/exec/tests/utils/PortUtil.h" + +namespace bp = boost::process; +namespace gc = google::cloud; +namespace gcs = google::cloud::storage; + +namespace facebook::velox::filesystems { + +class GcsTestbench : public testing::Environment { + public: + GcsTestbench() { + auto port = facebook::velox::exec::test::getFreePorts(1); + port_ = std::to_string(port[0]); + std::vector names{"python3", "python"}; + // If the build script or application developer provides a value in the + // PYTHON environment variable, then just use that. + if (const auto* env = std::getenv("PYTHON")) { + names = {env}; + } + auto error = std::string( + "Coud not start GCS emulator." + " Used the following list of python interpreter names:"); + for (const auto& interpreter : names) { + auto exe_path = bp::search_path(interpreter); + error += " " + interpreter; + if (exe_path.empty()) { + error += " (exe not found)"; + continue; + } + + serverProcess_ = bp::child( + boost::this_process::environment(), + exe_path, + "-m", + "testbench", + "--port", + port_, + group_); + if (serverProcess_.valid() && serverProcess_.running()) + break; + error += " (failed to start)"; + serverProcess_.terminate(); + serverProcess_.wait(); + } + if (serverProcess_.valid() && serverProcess_.valid()) + return; + error_ = std::move(error); + } + + ~GcsTestbench() override { + // Brutal shutdown, kill the full process group because the GCS testbench + // may launch additional children. + group_.terminate(); + if (serverProcess_.valid()) { + serverProcess_.wait(); + } + } + + const std::string& port() const { + return port_; + } + + const std::string& error() const { + return error_; + } + + std::string_view preexistingBucketName() { + return bucketName_; + } + + std::string_view preexistingObjectName() { + return objectName_; + } + + void bootstrap() { + ASSERT_THAT(this, ::testing::NotNull()); + ASSERT_THAT(this->error(), ::testing::IsEmpty()); + + // Create a bucket and a small file in the testbench. This makes it easier + // to bootstrap GcsFileSystem and its tests. + auto client = gcs::Client( + google::cloud::Options{} + .set("http://localhost:" + this->port()) + .set(gc::MakeInsecureCredentials())); + + bucketName_ = "test1-gcs"; + google::cloud::StatusOr bucket = + client.CreateBucketForProject( + bucketName_, "ignored-by-testbench", gcs::BucketMetadata{}); + ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" << bucketName_ + << ">, status=" << bucket.status(); + + objectName_ = "test-object-name"; + google::cloud::StatusOr object = + client.InsertObject(bucketName_, objectName_, kLoremIpsum); + ASSERT_TRUE(object.ok()) << "Failed to create object <" << objectName_ + << ">, status=" << object.status(); + } + + private: + std::string port_; + bp::child serverProcess_; + bp::group group_; + std::string error_; + std::string bucketName_; + std::string objectName_; +}; + +} // namespace facebook::velox::filesystems