diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp index de889d6cb5d7..50b3e360c3e4 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp @@ -25,15 +25,14 @@ namespace facebook::velox::filesystems { AbfsConfig::AbfsConfig( std::string_view path, - const config::ConfigBase& config, - bool initDfsClient) { + const config::ConfigBase& config) { std::string_view file; - bool isHttps = true; + isHttps_ = true; if (path.find(kAbfssScheme) == 0) { file = path.substr(kAbfssScheme.size()); } else if (path.find(kAbfsScheme) == 0) { file = path.substr(kAbfsScheme.size()); - isHttps = false; + isHttps_ = false; } else { VELOX_FAIL("Invalid ABFS Path {}", path); } @@ -42,39 +41,24 @@ AbfsConfig::AbfsConfig( fileSystem_ = file.substr(0, firstAt); auto firstSep = file.find_first_of("/"); filePath_ = file.substr(firstSep + 1); - auto accountNameWithSuffix = file.substr(firstAt + 1, firstSep - firstAt - 1); - std::string accountNameWithSuffixForUrl(accountNameWithSuffix); - if (!initDfsClient) { - // We should use correct suffix for blob client. - size_t start_pos = accountNameWithSuffixForUrl.find("dfs"); - if (start_pos != std::string::npos) { - accountNameWithSuffixForUrl.replace(start_pos, 3, "blob"); - } - } - - url_ = fmt::format( - "{}{}/{}/{}", - isHttps ? "https://" : "http://", - accountNameWithSuffixForUrl, - fileSystem_, - filePath_); + accountNameWithSuffix_ = file.substr(firstAt + 1, firstSep - firstAt - 1); auto authTypeKey = - fmt::format("{}.{}", kAzureAccountAuthType, accountNameWithSuffix); + fmt::format("{}.{}", kAzureAccountAuthType, accountNameWithSuffix_); authType_ = "SharedKey"; if (config.valueExists(authTypeKey)) { authType_ = config.get(authTypeKey).value(); } if (authType_ == "SharedKey") { auto credKey = - fmt::format("{}.{}", kAzureAccountKey, accountNameWithSuffix); + fmt::format("{}.{}", kAzureAccountKey, accountNameWithSuffix_); VELOX_USER_CHECK( config.valueExists(credKey), "Config {} not found", credKey); - auto firstDot = accountNameWithSuffix.find_first_of("."); - auto accountName = accountNameWithSuffix.substr(0, firstDot); - auto endpointSuffix = accountNameWithSuffix.substr(firstDot + 5); + auto firstDot = accountNameWithSuffix_.find_first_of("."); + auto accountName = accountNameWithSuffix_.substr(0, firstDot); + auto endpointSuffix = accountNameWithSuffix_.substr(firstDot + 5); std::stringstream ss; - ss << "DefaultEndpointsProtocol=" << (isHttps ? "https" : "http"); + ss << "DefaultEndpointsProtocol=" << (isHttps_ ? "https" : "http"); ss << ";AccountName=" << accountName; ss << ";AccountKey=" << config.get(credKey).value(); ss << ";EndpointSuffix=" << endpointSuffix; @@ -87,11 +71,11 @@ AbfsConfig::AbfsConfig( connectionString_ = ss.str(); } else if (authType_ == "OAuth") { auto clientIdKey = fmt::format( - "{}.{}", kAzureAccountOAuth2ClientId, accountNameWithSuffix); + "{}.{}", kAzureAccountOAuth2ClientId, accountNameWithSuffix_); auto clientSecretKey = fmt::format( - "{}.{}", kAzureAccountOAuth2ClientSecret, accountNameWithSuffix); + "{}.{}", kAzureAccountOAuth2ClientSecret, accountNameWithSuffix_); auto clientEndpointKey = fmt::format( - "{}.{}", kAzureAccountOAuth2ClientEndpoint, accountNameWithSuffix); + "{}.{}", kAzureAccountOAuth2ClientEndpoint, accountNameWithSuffix_); VELOX_USER_CHECK( config.valueExists(clientIdKey), "Config {} not found", clientIdKey); VELOX_USER_CHECK( @@ -116,10 +100,9 @@ AbfsConfig::AbfsConfig( config.get(clientSecretKey).value(), options); } else if (authType_ == "SAS") { - auto sasKey = fmt::format("{}.{}", kAzureSASKey, accountNameWithSuffix); + auto sasKey = fmt::format("{}.{}", kAzureSASKey, accountNameWithSuffix_); VELOX_USER_CHECK(config.valueExists(sasKey), "Config {} not found", sasKey); - urlWithSasToken_ = - fmt::format("{}?{}", url_, config.get(sasKey).value()); + sas_ = config.get(sasKey).value(); } else { VELOX_USER_FAIL( "Unsupported auth type {}, supported auth types are SharedKey, OAuth and SAS.", @@ -127,4 +110,49 @@ AbfsConfig::AbfsConfig( } } +std::unique_ptr AbfsConfig::getReadFileClient() { + if (authType_ == "SAS") { + auto url = getUrl(true); + return std::make_unique(fmt::format("{}?{}", url, sas_)); + } else if (authType_ == "OAuth") { + auto url = getUrl(true); + return std::make_unique(url, tokenCredential_); + } else { + return std::make_unique(BlobClient::CreateFromConnectionString( + connectionString_, fileSystem_, filePath_)); + } +} + +std::unique_ptr AbfsConfig::getWriteFileClient() { + if (authType_ == "SAS") { + auto url = getUrl(false); + return std::make_unique( + fmt::format("{}?{}", url, sas_)); + } else if (authType_ == "OAuth") { + auto url = getUrl(false); + return std::make_unique(url, tokenCredential_); + } else { + return std::make_unique( + DataLakeFileClient::CreateFromConnectionString( + connectionString_, fileSystem_, filePath_)); + } +} + +std::string AbfsConfig::getUrl(bool withblobSuffix) { + std::string accountNameWithSuffixForUrl(accountNameWithSuffix_); + if (withblobSuffix) { + // We should use correct suffix for blob client. + size_t start_pos = accountNameWithSuffixForUrl.find("dfs"); + if (start_pos != std::string::npos) { + accountNameWithSuffixForUrl.replace(start_pos, 3, "blob"); + } + } + return fmt::format( + "{}{}/{}/{}", + isHttps_ ? "https://" : "http://", + accountNameWithSuffixForUrl, + fileSystem_, + filePath_); +} + } // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h b/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h index a21959ef9ee9..b72ba906dbbb 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h @@ -17,9 +17,14 @@ #pragma once #include +#include +#include #include #include +using namespace Azure::Storage::Blobs; +using namespace Azure::Storage::Files::DataLake; + namespace facebook::velox::config { class ConfigBase; } @@ -27,78 +32,74 @@ class ConfigBase; namespace facebook::velox::filesystems { // This is used to specify the Azurite endpoint in testing. -static std::string kAzureBlobEndpoint{"fs.azure.blob-endpoint"}; +static constexpr const char* kAzureBlobEndpoint{"fs.azure.blob-endpoint"}; // The authentication mechanism is set in `fs.azure.account.auth.type` (or the // account specific variant). The supported values are SharedKey, OAuth and SAS. -static std::string kAzureAccountAuthType{"fs.azure.account.auth.type"}; +static constexpr const char* kAzureAccountAuthType = + "fs.azure.account.auth.type"; -static std::string kAzureAccountKey{"fs.azure.account.key"}; +static constexpr const char* kAzureAccountKey = "fs.azure.account.key"; -static std::string kAzureSASKey{"fs.azure.sas.fixed.token"}; +static constexpr const char* kAzureSASKey = "fs.azure.sas.fixed.token"; -static std::string kAzureAccountOAuth2ClientId{ - "fs.azure.account.oauth2.client.id"}; -static std::string kAzureAccountOAuth2ClientSecret{ - "fs.azure.account.oauth2.client.secret"}; +static constexpr const char* kAzureAccountOAuth2ClientId = + "fs.azure.account.oauth2.client.id"; +static constexpr const char* kAzureAccountOAuth2ClientSecret = + "fs.azure.account.oauth2.client.secret"; // Token end point, this can be found through Azure portal. For example: // https://login.microsoftonline.com/{TENANTID}/oauth2/token -static std::string kAzureAccountOAuth2ClientEndpoint{ - "fs.azure.account.oauth2.client.endpoint"}; +static constexpr const char* kAzureAccountOAuth2ClientEndpoint = + "fs.azure.account.oauth2.client.endpoint"; class AbfsConfig { public: - explicit AbfsConfig( - std::string_view path, - const config::ConfigBase& config, - bool initDfsClient); + explicit AbfsConfig(std::string_view path, const config::ConfigBase& config); - std::string authType() const { - return authType_; - } + std::unique_ptr getReadFileClient(); - std::string fileSystem() const { - return fileSystem_; - } + std::unique_ptr getWriteFileClient(); std::string filePath() const { return filePath_; } - std::string connectionString() const { - return connectionString_; - } - - std::string url() const { - return url_; - } - - std::string urlWithSasToken() const { - return urlWithSasToken_; + /// Test only. + std::string fileSystem() const { + return fileSystem_; } - std::shared_ptr tokenCredential() - const { - return tokenCredential_; + /// Test only. + std::string connectionString() const { + return connectionString_; } + /// Test only. std::string tenentId() const { return tenentId_; } + /// Test only. std::string authorityHost() const { return authorityHost_; } private: + std::string getUrl(bool withblobSuffix); + + std::string authType_; + // Container name is called FileSystem in some Azure API. std::string fileSystem_; std::string filePath_; - std::string authType_; std::string connectionString_; - std::string urlWithSasToken_; - std::string url_; + + bool isHttps_; + std::string accountNameWithSuffix_; + + std::string sas_; + std::string tenentId_; std::string authorityHost_; std::shared_ptr tokenCredential_; diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index cecf8e071db1..7e63c2df1438 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -16,7 +16,6 @@ #include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" -#include #include #include #include @@ -27,7 +26,6 @@ #include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" namespace facebook::velox::filesystems { -using namespace Azure::Storage::Blobs; class AbfsReadFile::Impl { constexpr static uint64_t kNaturalReadSize = 4 << 20; // 4M @@ -35,20 +33,9 @@ class AbfsReadFile::Impl { public: explicit Impl(std::string_view path, const config::ConfigBase& config) { - auto abfsConfig = AbfsConfig(path, config, false); + auto abfsConfig = AbfsConfig(path, config); filePath_ = abfsConfig.filePath(); - if (abfsConfig.authType() == "SAS") { - fileClient_ = std::make_unique(abfsConfig.urlWithSasToken()); - } else if (abfsConfig.authType() == "OAuth") { - fileClient_ = std::make_unique( - abfsConfig.url(), abfsConfig.tokenCredential()); - } else { - fileClient_ = - std::make_unique(BlobClient::CreateFromConnectionString( - abfsConfig.connectionString(), - abfsConfig.fileSystem(), - filePath_)); - } + fileClient_ = abfsConfig.getReadFileClient(); } void initialize(const FileOptions& options) { diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp index 74ce50f2acd9..9576252db0af 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp @@ -15,12 +15,9 @@ */ #include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" -#include #include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" -using namespace Azure::Storage::Files::DataLake; - namespace facebook::velox::filesystems { class DataLakeFileClientWrapper final : public AzureDataLakeFileClient { public: @@ -31,7 +28,8 @@ class DataLakeFileClientWrapper final : public AzureDataLakeFileClient { client_->Create(); } - Models::PathProperties getProperties() override { + Azure::Storage::Files::DataLake::Models::PathProperties getProperties() + override { return client_->GetProperties().Value; } @@ -120,23 +118,10 @@ class AbfsWriteFile::Impl { AbfsWriteFile::AbfsWriteFile( std::string_view path, const config::ConfigBase& config) { - auto abfsConfig = AbfsConfig(path, config, true); - std::unique_ptr clientWrapper; - if (abfsConfig.authType() == "SAS") { - clientWrapper = std::make_unique( - std::make_unique(abfsConfig.urlWithSasToken())); - } else if (abfsConfig.authType() == "OAuth") { - clientWrapper = std::make_unique( - std::make_unique( - abfsConfig.url(), abfsConfig.tokenCredential())); - } else { - clientWrapper = std::make_unique( - std::make_unique( - DataLakeFileClient::CreateFromConnectionString( - abfsConfig.connectionString(), - abfsConfig.fileSystem(), - abfsConfig.filePath()))); - } + auto abfsConfig = AbfsConfig(path, config); + std::unique_ptr clientWrapper = + std::make_unique( + abfsConfig.getWriteFileClient()); impl_ = std::make_unique(path, clientWrapper); } diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsCommonTest.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsCommonTest.cpp index 6279f21e5122..053971534360 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsCommonTest.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsCommonTest.cpp @@ -40,7 +40,7 @@ TEST(AbfsConfigTest, authType) { false); VELOX_ASSERT_USER_THROW( std::make_unique( - "abfss://foo@efg.dfs.core.windows.net/test.txt", config, false), + "abfss://foo@efg.dfs.core.windows.net/test.txt", config), "Unsupported auth type Custom, supported auth types are SharedKey, OAuth and SAS."); } @@ -62,23 +62,30 @@ TEST(AbfsConfigTest, clientSecretOAuth) { false); VELOX_ASSERT_USER_THROW( std::make_unique( - "abfss://foo@bar1.dfs.core.windows.net/test.txt", config, false), + "abfss://foo@bar1.dfs.core.windows.net/test.txt", config), "Config fs.azure.account.oauth2.client.id.bar1.dfs.core.windows.net not found"); VELOX_ASSERT_USER_THROW( std::make_unique( - "abfss://foo@bar2.dfs.core.windows.net/test.txt", config, false), + "abfss://foo@bar2.dfs.core.windows.net/test.txt", config), "Config fs.azure.account.oauth2.client.secret.bar2.dfs.core.windows.net not found"); VELOX_ASSERT_USER_THROW( std::make_unique( - "abfss://foo@bar3.dfs.core.windows.net/test.txt", config, false), + "abfss://foo@bar3.dfs.core.windows.net/test.txt", config), "Config fs.azure.account.oauth2.client.endpoint.bar3.dfs.core.windows.net not found"); - auto abfsConfig = AbfsConfig( - "abfss://abc@efg.dfs.core.windows.net/file/test.txt", config, true); + auto abfsConfig = + AbfsConfig("abfss://abc@efg.dfs.core.windows.net/file/test.txt", config); EXPECT_EQ(abfsConfig.tenentId(), "{TENANTID}"); EXPECT_EQ(abfsConfig.authorityHost(), "https://login.microsoftonline.com/"); + auto readClient = abfsConfig.getReadFileClient(); + EXPECT_EQ( + readClient->GetUrl(), + "https://efg.blob.core.windows.net/abc/file/test.txt"); + auto writeClient = abfsConfig.getWriteFileClient(); + // GetUrl retrieves the value from the internal blob client, which represents + // the blob's path as well. EXPECT_EQ( - abfsConfig.url(), "https://efg.dfs.core.windows.net/abc/file/test.txt"); - EXPECT_TRUE(abfsConfig.tokenCredential() != nullptr); + writeClient->GetUrl(), + "https://efg.blob.core.windows.net/abc/file/test.txt"); } TEST(AbfsConfigTest, sasToken) { @@ -89,12 +96,19 @@ TEST(AbfsConfigTest, sasToken) { false); VELOX_ASSERT_USER_THROW( std::make_unique( - "abfss://foo@efg.dfs.core.windows.net/test.txt", config, false), + "abfss://foo@efg.dfs.core.windows.net/test.txt", config), "Config fs.azure.sas.fixed.token.efg.dfs.core.windows.net not found"); auto abfsConfig = - AbfsConfig("abfs://abc@bar.dfs.core.windows.net/file", config, false); + AbfsConfig("abfs://abc@bar.dfs.core.windows.net/file", config); + auto readClient = abfsConfig.getReadFileClient(); + EXPECT_EQ( + readClient->GetUrl(), + "http://bar.blob.core.windows.net/abc/file?sas=test"); + auto writeClient = abfsConfig.getWriteFileClient(); + // GetUrl retrieves the value from the internal blob client, which represents + // the blob's path as well. EXPECT_EQ( - abfsConfig.urlWithSasToken(), + writeClient->GetUrl(), "http://bar.blob.core.windows.net/abc/file?sas=test"); } @@ -107,7 +121,7 @@ TEST(AbfsConfigTest, sharedKey) { false); auto abfsConfig = - AbfsConfig("abfs://abc@efg.dfs.core.windows.net/file", config, false); + AbfsConfig("abfs://abc@efg.dfs.core.windows.net/file", config); EXPECT_EQ(abfsConfig.fileSystem(), "abc"); EXPECT_EQ(abfsConfig.filePath(), "file"); EXPECT_EQ( @@ -116,8 +130,7 @@ TEST(AbfsConfigTest, sharedKey) { auto abfssConfig = AbfsConfig( "abfss://abc@foobar.dfs.core.windows.net/sf_1/store_sales/ss_sold_date_sk=2450816/part-00002-a29c25f1-4638-494e-8428-a84f51dcea41.c000.snappy.parquet", - config, - false); + config); EXPECT_EQ(abfssConfig.fileSystem(), "abc"); EXPECT_EQ( abfssConfig.filePath(), @@ -128,9 +141,7 @@ TEST(AbfsConfigTest, sharedKey) { // Test with special character space. auto abfssConfigWithSpecialCharacters = AbfsConfig( - "abfss://foo@bar.dfs.core.windows.net/main@dir/sub dir/test.txt", - config, - false); + "abfss://foo@bar.dfs.core.windows.net/main@dir/sub dir/test.txt", config); EXPECT_EQ(abfssConfigWithSpecialCharacters.fileSystem(), "foo"); EXPECT_EQ( @@ -138,8 +149,6 @@ TEST(AbfsConfigTest, sharedKey) { VELOX_ASSERT_USER_THROW( std::make_unique( - "abfss://foo@otheraccount.dfs.core.windows.net/test.txt", - config, - false), + "abfss://foo@otheraccount.dfs.core.windows.net/test.txt", config), "Config fs.azure.account.key.otheraccount.dfs.core.windows.net not found"); } diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.cpp index 3b54fe708437..726c2c531be8 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.cpp @@ -108,7 +108,7 @@ AzuriteServer::AzuriteServer(int64_t port) : port_(port) { } void AzuriteServer::addFile(std::string source) { - AbfsConfig conf(fileURI(), *hiveConfig(), false); + AbfsConfig conf(fileURI(), *hiveConfig()); auto containerClient = BlobContainerClient::CreateFromConnectionString( conf.connectionString(), container_); containerClient.CreateIfNotExists();