diff --git a/CMakeLists.txt b/CMakeLists.txt index 8ddbbeb76e38..a106a0c2ad5d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -254,6 +254,7 @@ if(VELOX_ENABLE_ABFS) endif() # files-datalake is built on blobs find_package(azure-storage-files-datalake-cpp CONFIG REQUIRED) + find_package(azure-identity-cpp CONFIG REQUIRED) add_definitions(-DVELOX_ENABLE_ABFS) endif() diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp index b8607321e117..fbbb23465966 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp @@ -19,18 +19,20 @@ #include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" +#include + namespace facebook::velox::filesystems { AbfsConfig::AbfsConfig( std::string_view path, const config::ConfigBase& config) { std::string_view file; - bool isHttps = true; + isHttps_ = true; if (path.find(kAbfssScheme) == 0) { file = path.substr(kAbfssScheme.size()); } else if (path.find(kAbfsScheme) == 0) { file = path.substr(kAbfsScheme.size()); - isHttps = false; + isHttps_ = false; } else { VELOX_FAIL("Invalid ABFS Path {}", path); } @@ -39,30 +41,118 @@ AbfsConfig::AbfsConfig( fileSystem_ = file.substr(0, firstAt); auto firstSep = file.find_first_of("/"); filePath_ = file.substr(firstSep + 1); + accountNameWithSuffix_ = file.substr(firstAt + 1, firstSep - firstAt - 1); - auto accountNameWithSuffix = file.substr(firstAt + 1, firstSep - firstAt - 1); - auto firstDot = accountNameWithSuffix.find_first_of("."); - auto accountName = accountNameWithSuffix.substr(0, firstDot); - auto endpointSuffix = accountNameWithSuffix.substr(firstDot + 5); - auto credKey = fmt::format("fs.azure.account.key.{}", accountNameWithSuffix); - std::stringstream ss; - ss << "DefaultEndpointsProtocol=" << (isHttps ? "https" : "http"); - ss << ";AccountName=" << accountName; - - if (config.valueExists(credKey)) { + auto authTypeKey = + fmt::format("{}.{}", kAzureAccountAuthType, accountNameWithSuffix_); + authType_ = kAzureSharedKeyAuthType; + if (config.valueExists(authTypeKey)) { + authType_ = config.get(authTypeKey).value(); + } + if (authType_ == kAzureSharedKeyAuthType) { + auto credKey = + fmt::format("{}.{}", kAzureAccountKey, accountNameWithSuffix_); + VELOX_USER_CHECK( + config.valueExists(credKey), "Config {} not found", credKey); + auto firstDot = accountNameWithSuffix_.find_first_of("."); + auto accountName = accountNameWithSuffix_.substr(0, firstDot); + auto endpointSuffix = accountNameWithSuffix_.substr(firstDot + 5); + std::stringstream ss; + ss << "DefaultEndpointsProtocol=" << (isHttps_ ? "https" : "http"); + ss << ";AccountName=" << accountName; ss << ";AccountKey=" << config.get(credKey).value(); + ss << ";EndpointSuffix=" << endpointSuffix; + + if (config.valueExists(kAzureBlobEndpoint)) { + ss << ";BlobEndpoint=" + << config.get(kAzureBlobEndpoint).value(); + } + ss << ";"; + connectionString_ = ss.str(); + } else if (authType_ == kAzureOAuthAuthType) { + auto clientIdKey = fmt::format( + "{}.{}", kAzureAccountOAuth2ClientId, accountNameWithSuffix_); + auto clientSecretKey = fmt::format( + "{}.{}", kAzureAccountOAuth2ClientSecret, accountNameWithSuffix_); + auto clientEndpointKey = fmt::format( + "{}.{}", kAzureAccountOAuth2ClientEndpoint, accountNameWithSuffix_); + VELOX_USER_CHECK( + config.valueExists(clientIdKey), "Config {} not found", clientIdKey); + VELOX_USER_CHECK( + config.valueExists(clientSecretKey), + "Config {} not found", + clientSecretKey); + VELOX_USER_CHECK( + config.valueExists(clientEndpointKey), + "Config {} not found", + clientEndpointKey); + auto clientEndpoint = config.get(clientEndpointKey).value(); + auto firstSep = clientEndpoint.find_first_of("/", /* https:// */ 8); + authorityHost_ = clientEndpoint.substr(0, firstSep + 1); + auto sedondSep = clientEndpoint.find_first_of("/", firstSep + 1); + tenentId_ = clientEndpoint.substr(firstSep + 1, sedondSep - firstSep - 1); + Azure::Identity::ClientSecretCredentialOptions options; + options.AuthorityHost = authorityHost_; + tokenCredential_ = + std::make_shared( + tenentId_, + config.get(clientIdKey).value(), + config.get(clientSecretKey).value(), + options); + } else if (authType_ == kAzureSASAuthType) { + auto sasKey = fmt::format("{}.{}", kAzureSASKey, accountNameWithSuffix_); + VELOX_USER_CHECK(config.valueExists(sasKey), "Config {} not found", sasKey); + sas_ = config.get(sasKey).value(); + } else { + VELOX_USER_FAIL( + "Unsupported auth type {}, supported auth types are SharedKey, OAuth and SAS.", + authType_); + } +} + +std::unique_ptr AbfsConfig::getReadFileClient() { + if (authType_ == kAzureSASAuthType) { + auto url = getUrl(true); + return std::make_unique(fmt::format("{}?{}", url, sas_)); + } else if (authType_ == kAzureOAuthAuthType) { + auto url = getUrl(true); + return std::make_unique(url, tokenCredential_); } else { - VELOX_USER_FAIL("Config {} not found", credKey); + return std::make_unique(BlobClient::CreateFromConnectionString( + connectionString_, fileSystem_, filePath_)); } +} - ss << ";EndpointSuffix=" << endpointSuffix; +std::unique_ptr AbfsConfig::getWriteFileClient() { + if (authType_ == kAzureSASAuthType) { + auto url = getUrl(false); + return std::make_unique( + fmt::format("{}?{}", url, sas_)); + } else if (authType_ == kAzureOAuthAuthType) { + auto url = getUrl(false); + return std::make_unique(url, tokenCredential_); + } else { + return std::make_unique( + DataLakeFileClient::CreateFromConnectionString( + connectionString_, fileSystem_, filePath_)); + } +} - if (config.valueExists(kAzureBlobEndpoint)) { - ss << ";BlobEndpoint=" - << config.get(kAzureBlobEndpoint).value(); +std::string AbfsConfig::getUrl(bool withblobSuffix) { + std::string accountNameWithSuffixForUrl(accountNameWithSuffix_); + if (withblobSuffix) { + // We should use correct suffix for blob client. + size_t start_pos = accountNameWithSuffixForUrl.find("dfs"); + if (start_pos != std::string::npos) { + accountNameWithSuffixForUrl.replace(start_pos, 3, "blob"); + } } - ss << ";"; - connectionString_ = ss.str(); + return fmt::format( + "{}{}/{}/{}", + isHttps_ ? "https://" : "http://", + accountNameWithSuffixForUrl, + fileSystem_, + filePath_); } } // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h b/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h index 6fcc6b4a8b30..109e1576ec68 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h @@ -16,9 +16,15 @@ #pragma once +#include +#include +#include #include #include +using namespace Azure::Storage::Blobs; +using namespace Azure::Storage::Files::DataLake; + namespace facebook::velox::config { class ConfigBase; } @@ -26,34 +32,84 @@ class ConfigBase; namespace facebook::velox::filesystems { // This is used to specify the Azurite endpoint in testing. -static std::string kAzureBlobEndpoint{"fs.azure.blob-endpoint"}; +static constexpr const char* kAzureBlobEndpoint{"fs.azure.blob-endpoint"}; + +// The authentication mechanism is set in `fs.azure.account.auth.type` (or the +// account specific variant). The supported values are SharedKey, OAuth and SAS. +static constexpr const char* kAzureAccountAuthType = + "fs.azure.account.auth.type"; + +static constexpr const char* kAzureAccountKey = "fs.azure.account.key"; + +static constexpr const char* kAzureSASKey = "fs.azure.sas.fixed.token"; + +static constexpr const char* kAzureAccountOAuth2ClientId = + "fs.azure.account.oauth2.client.id"; + +static constexpr const char* kAzureAccountOAuth2ClientSecret = + "fs.azure.account.oauth2.client.secret"; + +// Token end point, this can be found through Azure portal. For example: +// https://login.microsoftonline.com/{TENANTID}/oauth2/token +static constexpr const char* kAzureAccountOAuth2ClientEndpoint = + "fs.azure.account.oauth2.client.endpoint"; + +static constexpr const char* kAzureSharedKeyAuthType = "SharedKey"; + +static constexpr const char* kAzureOAuthAuthType = "OAuth"; + +static constexpr const char* kAzureSASAuthType = "SAS"; class AbfsConfig { public: explicit AbfsConfig(std::string_view path, const config::ConfigBase& config); - std::string identity() const { - const auto hash = folly::Hash(); - return std::to_string(hash(connectionString_)); + std::unique_ptr getReadFileClient(); + + std::unique_ptr getWriteFileClient(); + + std::string filePath() const { + return filePath_; } + /// Test only. + std::string fileSystem() const { + return fileSystem_; + } + + /// Test only. std::string connectionString() const { return connectionString_; } - std::string fileSystem() const { - return fileSystem_; + /// Test only. + std::string tenentId() const { + return tenentId_; } - std::string filePath() const { - return filePath_; + /// Test only. + std::string authorityHost() const { + return authorityHost_; } private: + std::string getUrl(bool withblobSuffix); + + std::string authType_; + // Container name is called FileSystem in some Azure API. std::string fileSystem_; std::string filePath_; std::string connectionString_; + + bool isHttps_; + std::string accountNameWithSuffix_; + + std::string sas_; + + std::string tenentId_; + std::string authorityHost_; + std::shared_ptr tokenCredential_; }; } // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index 85c381201054..7e63c2df1438 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -16,7 +16,6 @@ #include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" -#include #include #include #include @@ -27,7 +26,6 @@ #include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" namespace facebook::velox::filesystems { -using namespace Azure::Storage::Blobs; class AbfsReadFile::Impl { constexpr static uint64_t kNaturalReadSize = 4 << 20; // 4M @@ -35,11 +33,9 @@ class AbfsReadFile::Impl { public: explicit Impl(std::string_view path, const config::ConfigBase& config) { - auto account = AbfsConfig(path, config); - filePath_ = account.filePath(); - fileClient_ = - std::make_unique(BlobClient::CreateFromConnectionString( - account.connectionString(), account.fileSystem(), filePath_)); + auto abfsConfig = AbfsConfig(path, config); + filePath_ = abfsConfig.filePath(); + fileClient_ = abfsConfig.getReadFileClient(); } void initialize(const FileOptions& options) { @@ -59,7 +55,6 @@ class AbfsReadFile::Impl { } catch (Azure::Storage::StorageException& e) { throwStorageExceptionWithOperationDetails("GetProperties", filePath_, e); } - VELOX_CHECK_GE(length_, 0); } @@ -141,7 +136,6 @@ class AbfsReadFile::Impl { Azure::Storage::Blobs::DownloadBlobOptions blob; blob.Range = range; - auto response = fileClient_->Download(blob); response.Value.BodyStream->ReadToCount( reinterpret_cast(position), length); diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp index 5f832cb0e7d1..9576252db0af 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp @@ -15,12 +15,9 @@ */ #include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" -#include #include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" -using namespace Azure::Storage::Files::DataLake; - namespace facebook::velox::filesystems { class DataLakeFileClientWrapper final : public AzureDataLakeFileClient { public: @@ -31,7 +28,8 @@ class DataLakeFileClientWrapper final : public AzureDataLakeFileClient { client_->Create(); } - Models::PathProperties getProperties() override { + Azure::Storage::Files::DataLake::Models::PathProperties getProperties() + override { return client_->GetProperties().Value; } @@ -120,16 +118,11 @@ class AbfsWriteFile::Impl { AbfsWriteFile::AbfsWriteFile( std::string_view path, const config::ConfigBase& config) { - auto abfsAccount = AbfsConfig(path, config); - std::unique_ptr client = + auto abfsConfig = AbfsConfig(path, config); + std::unique_ptr clientWrapper = std::make_unique( - std::make_unique( - DataLakeFileClient::CreateFromConnectionString( - abfsAccount.connectionString(), - abfsAccount.fileSystem(), - abfsAccount.filePath()))); - - impl_ = std::make_unique(path, client); + abfsConfig.getWriteFileClient()); + impl_ = std::make_unique(path, clientWrapper); } AbfsWriteFile::AbfsWriteFile( diff --git a/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt index b6a55e2842d3..799e93830373 100644 --- a/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt @@ -30,6 +30,7 @@ if(VELOX_ENABLE_ABFS) velox_core velox_hive_config velox_dwio_common_exception + Azure::azure-identity Azure::azure-storage-blobs Azure::azure-storage-files-datalake Folly::folly diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsCommonTest.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsCommonTest.cpp index f40edfa6e039..053971534360 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsCommonTest.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsCommonTest.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "velox/common/base/tests/GTestUtils.h" #include "velox/common/config/Config.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" @@ -32,9 +33,89 @@ TEST(AbfsUtilsTest, isAbfsFile) { EXPECT_TRUE(isAbfsFile("abfss://test@test.dfs.core.windows.net/test")); } -TEST(AbfsUtilsTest, abfsConfig) { +TEST(AbfsConfigTest, authType) { + const config::ConfigBase config( + {{"fs.azure.account.auth.type.efg.dfs.core.windows.net", "Custom"}, + {"fs.azure.account.key.efg.dfs.core.windows.net", "456"}}, + false); + VELOX_ASSERT_USER_THROW( + std::make_unique( + "abfss://foo@efg.dfs.core.windows.net/test.txt", config), + "Unsupported auth type Custom, supported auth types are SharedKey, OAuth and SAS."); +} + +TEST(AbfsConfigTest, clientSecretOAuth) { + const config::ConfigBase config( + {{"fs.azure.account.auth.type.efg.dfs.core.windows.net", "OAuth"}, + {"fs.azure.account.auth.type.bar1.dfs.core.windows.net", "OAuth"}, + {"fs.azure.account.auth.type.bar2.dfs.core.windows.net", "OAuth"}, + {"fs.azure.account.auth.type.bar3.dfs.core.windows.net", "OAuth"}, + {"fs.azure.account.oauth2.client.id.efg.dfs.core.windows.net", "test"}, + {"fs.azure.account.oauth2.client.secret.efg.dfs.core.windows.net", + "test"}, + {"fs.azure.account.oauth2.client.endpoint.efg.dfs.core.windows.net", + "https://login.microsoftonline.com/{TENANTID}/oauth2/token"}, + {"fs.azure.account.oauth2.client.id.bar2.dfs.core.windows.net", "test"}, + {"fs.azure.account.oauth2.client.id.bar3.dfs.core.windows.net", "test"}, + {"fs.azure.account.oauth2.client.secret.bar3.dfs.core.windows.net", + "test"}}, + false); + VELOX_ASSERT_USER_THROW( + std::make_unique( + "abfss://foo@bar1.dfs.core.windows.net/test.txt", config), + "Config fs.azure.account.oauth2.client.id.bar1.dfs.core.windows.net not found"); + VELOX_ASSERT_USER_THROW( + std::make_unique( + "abfss://foo@bar2.dfs.core.windows.net/test.txt", config), + "Config fs.azure.account.oauth2.client.secret.bar2.dfs.core.windows.net not found"); + VELOX_ASSERT_USER_THROW( + std::make_unique( + "abfss://foo@bar3.dfs.core.windows.net/test.txt", config), + "Config fs.azure.account.oauth2.client.endpoint.bar3.dfs.core.windows.net not found"); + auto abfsConfig = + AbfsConfig("abfss://abc@efg.dfs.core.windows.net/file/test.txt", config); + EXPECT_EQ(abfsConfig.tenentId(), "{TENANTID}"); + EXPECT_EQ(abfsConfig.authorityHost(), "https://login.microsoftonline.com/"); + auto readClient = abfsConfig.getReadFileClient(); + EXPECT_EQ( + readClient->GetUrl(), + "https://efg.blob.core.windows.net/abc/file/test.txt"); + auto writeClient = abfsConfig.getWriteFileClient(); + // GetUrl retrieves the value from the internal blob client, which represents + // the blob's path as well. + EXPECT_EQ( + writeClient->GetUrl(), + "https://efg.blob.core.windows.net/abc/file/test.txt"); +} + +TEST(AbfsConfigTest, sasToken) { + const config::ConfigBase config( + {{"fs.azure.account.auth.type.efg.dfs.core.windows.net", "SAS"}, + {"fs.azure.account.auth.type.bar.dfs.core.windows.net", "SAS"}, + {"fs.azure.sas.fixed.token.bar.dfs.core.windows.net", "sas=test"}}, + false); + VELOX_ASSERT_USER_THROW( + std::make_unique( + "abfss://foo@efg.dfs.core.windows.net/test.txt", config), + "Config fs.azure.sas.fixed.token.efg.dfs.core.windows.net not found"); + auto abfsConfig = + AbfsConfig("abfs://abc@bar.dfs.core.windows.net/file", config); + auto readClient = abfsConfig.getReadFileClient(); + EXPECT_EQ( + readClient->GetUrl(), + "http://bar.blob.core.windows.net/abc/file?sas=test"); + auto writeClient = abfsConfig.getWriteFileClient(); + // GetUrl retrieves the value from the internal blob client, which represents + // the blob's path as well. + EXPECT_EQ( + writeClient->GetUrl(), + "http://bar.blob.core.windows.net/abc/file?sas=test"); +} + +TEST(AbfsConfigTest, sharedKey) { const config::ConfigBase config( {{"fs.azure.account.key.efg.dfs.core.windows.net", "123"}, + {"fs.azure.account.auth.type.efg.dfs.core.windows.net", "SharedKey"}, {"fs.azure.account.key.foobar.dfs.core.windows.net", "456"}, {"fs.azure.account.key.bar.dfs.core.windows.net", "789"}}, false); @@ -65,4 +146,9 @@ TEST(AbfsUtilsTest, abfsConfig) { EXPECT_EQ(abfssConfigWithSpecialCharacters.fileSystem(), "foo"); EXPECT_EQ( abfssConfigWithSpecialCharacters.filePath(), "main@dir/sub dir/test.txt"); + + VELOX_ASSERT_USER_THROW( + std::make_unique( + "abfss://foo@otheraccount.dfs.core.windows.net/test.txt", config), + "Config fs.azure.account.key.otheraccount.dfs.core.windows.net not found"); } diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 3ce8e224f870..761ab6043600 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -681,12 +681,42 @@ These semantics are similar to the `Apache Hadoop-Aws module .dfs.core.windows.net + - string + - SharedKey + - Specifies the authentication mechanism to use for Azure storage accounts. + **Allowed values:** "SharedKey", "OAuth", "SAS". + "SharedKey": Uses the storage account name and key for authentication. + "OAuth": Utilizes OAuth tokens for secure authentication. + "SAS": Employs Shared Access Signatures for granular access control. * - fs.azure.account.key..dfs.core.windows.net - string - - - The credentials to access the specific Azure Blob Storage account, replace with the name of your Azure Storage account. - This property aligns with how Spark configures Azure account key credentials for accessing Azure storage, by setting this property multiple - times with different storage account names, you can access multiple Azure storage accounts. + - The credentials to access the specific Azure Blob Storage account, replace with the name of your Azure Storage account. + This property aligns with how Spark configures Azure account key credentials for accessing Azure storage, by setting this property multiple + times with different storage account names, you can access multiple Azure storage accounts. + * - fs.azure.sas.fixed.token..dfs.core.windows.net + - string + - + - Specifies a fixed SAS (Shared Access Signature) token for accessing Azure storage. + This token provides scoped and time-limited access to specific resources. + Use this property when a pre-generated SAS token is used for authentication. + * - fs.azure.account.oauth2.client.id..dfs.core.windows.net + - string + - + - Specifies the client ID of the Azure AD application used for OAuth 2.0 authentication. + This client ID is required when using OAuth as the authentication type. + * - fs.azure.account.oauth2.client.secret..dfs.core.windows.net + - string + - + - Specifies the client secret of the Azure AD application used for OAuth 2.0 authentication. + This secret is required in conjunction with the client ID to authenticate the application. + * - fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net + - string + - + - Specifies the OAuth 2.0 token endpoint URL for the Azure AD application. + This endpoint is used to acquire access tokens for authenticating with Azure storage. + The URL follows the format: `https://login.microsoftonline.com//oauth2/token`. Presto-specific Configuration -----------------------------