Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ABFS): Support SAS and OAuth config #11623

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ if(VELOX_ENABLE_ABFS)
endif()
# files-datalake is built on blobs
find_package(azure-storage-files-datalake-cpp CONFIG REQUIRED)
find_package(azure-identity-cpp CONFIG REQUIRED)
add_definitions(-DVELOX_ENABLE_ABFS)
endif()

Expand Down
128 changes: 109 additions & 19 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"

#include <azure/identity/client_secret_credential.hpp>

namespace facebook::velox::filesystems {

AbfsConfig::AbfsConfig(
std::string_view path,
const config::ConfigBase& config) {
std::string_view file;
bool isHttps = true;
isHttps_ = true;
if (path.find(kAbfssScheme) == 0) {
file = path.substr(kAbfssScheme.size());
} else if (path.find(kAbfsScheme) == 0) {
file = path.substr(kAbfsScheme.size());
isHttps = false;
isHttps_ = false;
} else {
VELOX_FAIL("Invalid ABFS Path {}", path);
}
Expand All @@ -39,30 +41,118 @@ AbfsConfig::AbfsConfig(
fileSystem_ = file.substr(0, firstAt);
auto firstSep = file.find_first_of("/");
filePath_ = file.substr(firstSep + 1);
accountNameWithSuffix_ = file.substr(firstAt + 1, firstSep - firstAt - 1);

auto accountNameWithSuffix = file.substr(firstAt + 1, firstSep - firstAt - 1);
auto firstDot = accountNameWithSuffix.find_first_of(".");
auto accountName = accountNameWithSuffix.substr(0, firstDot);
auto endpointSuffix = accountNameWithSuffix.substr(firstDot + 5);
auto credKey = fmt::format("fs.azure.account.key.{}", accountNameWithSuffix);
std::stringstream ss;
ss << "DefaultEndpointsProtocol=" << (isHttps ? "https" : "http");
ss << ";AccountName=" << accountName;

if (config.valueExists(credKey)) {
auto authTypeKey =
fmt::format("{}.{}", kAzureAccountAuthType, accountNameWithSuffix_);
authType_ = kAzureSharedKeyAuthType;
if (config.valueExists(authTypeKey)) {
authType_ = config.get<std::string>(authTypeKey).value();
zhli1142015 marked this conversation as resolved.
Show resolved Hide resolved
}
if (authType_ == kAzureSharedKeyAuthType) {
auto credKey =
fmt::format("{}.{}", kAzureAccountKey, accountNameWithSuffix_);
VELOX_USER_CHECK(
config.valueExists(credKey), "Config {} not found", credKey);
auto firstDot = accountNameWithSuffix_.find_first_of(".");
auto accountName = accountNameWithSuffix_.substr(0, firstDot);
auto endpointSuffix = accountNameWithSuffix_.substr(firstDot + 5);
std::stringstream ss;
ss << "DefaultEndpointsProtocol=" << (isHttps_ ? "https" : "http");
ss << ";AccountName=" << accountName;
ss << ";AccountKey=" << config.get<std::string>(credKey).value();
ss << ";EndpointSuffix=" << endpointSuffix;

if (config.valueExists(kAzureBlobEndpoint)) {
ss << ";BlobEndpoint="
<< config.get<std::string>(kAzureBlobEndpoint).value();
}
ss << ";";
connectionString_ = ss.str();
} else if (authType_ == kAzureOAuthAuthType) {
auto clientIdKey = fmt::format(
"{}.{}", kAzureAccountOAuth2ClientId, accountNameWithSuffix_);
auto clientSecretKey = fmt::format(
"{}.{}", kAzureAccountOAuth2ClientSecret, accountNameWithSuffix_);
auto clientEndpointKey = fmt::format(
"{}.{}", kAzureAccountOAuth2ClientEndpoint, accountNameWithSuffix_);
VELOX_USER_CHECK(
config.valueExists(clientIdKey), "Config {} not found", clientIdKey);
VELOX_USER_CHECK(
config.valueExists(clientSecretKey),
"Config {} not found",
clientSecretKey);
VELOX_USER_CHECK(
config.valueExists(clientEndpointKey),
"Config {} not found",
clientEndpointKey);
auto clientEndpoint = config.get<std::string>(clientEndpointKey).value();
auto firstSep = clientEndpoint.find_first_of("/", /* https:// */ 8);
authorityHost_ = clientEndpoint.substr(0, firstSep + 1);
auto sedondSep = clientEndpoint.find_first_of("/", firstSep + 1);
tenentId_ = clientEndpoint.substr(firstSep + 1, sedondSep - firstSep - 1);
Azure::Identity::ClientSecretCredentialOptions options;
options.AuthorityHost = authorityHost_;
tokenCredential_ =
std::make_shared<Azure::Identity::ClientSecretCredential>(
tenentId_,
config.get<std::string>(clientIdKey).value(),
config.get<std::string>(clientSecretKey).value(),
options);
} else if (authType_ == kAzureSASAuthType) {
auto sasKey = fmt::format("{}.{}", kAzureSASKey, accountNameWithSuffix_);
VELOX_USER_CHECK(config.valueExists(sasKey), "Config {} not found", sasKey);
sas_ = config.get<std::string>(sasKey).value();
} else {
VELOX_USER_FAIL(
"Unsupported auth type {}, supported auth types are SharedKey, OAuth and SAS.",
authType_);
}
}

std::unique_ptr<BlobClient> AbfsConfig::getReadFileClient() {
if (authType_ == kAzureSASAuthType) {
auto url = getUrl(true);
return std::make_unique<BlobClient>(fmt::format("{}?{}", url, sas_));
} else if (authType_ == kAzureOAuthAuthType) {
auto url = getUrl(true);
return std::make_unique<BlobClient>(url, tokenCredential_);
} else {
VELOX_USER_FAIL("Config {} not found", credKey);
return std::make_unique<BlobClient>(BlobClient::CreateFromConnectionString(
connectionString_, fileSystem_, filePath_));
}
}

ss << ";EndpointSuffix=" << endpointSuffix;
std::unique_ptr<DataLakeFileClient> AbfsConfig::getWriteFileClient() {
if (authType_ == kAzureSASAuthType) {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(
fmt::format("{}?{}", url, sas_));
} else if (authType_ == kAzureOAuthAuthType) {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(url, tokenCredential_);
} else {
return std::make_unique<DataLakeFileClient>(
DataLakeFileClient::CreateFromConnectionString(
connectionString_, fileSystem_, filePath_));
}
}

if (config.valueExists(kAzureBlobEndpoint)) {
ss << ";BlobEndpoint="
<< config.get<std::string>(kAzureBlobEndpoint).value();
std::string AbfsConfig::getUrl(bool withblobSuffix) {
std::string accountNameWithSuffixForUrl(accountNameWithSuffix_);
if (withblobSuffix) {
// We should use correct suffix for blob client.
size_t start_pos = accountNameWithSuffixForUrl.find("dfs");
if (start_pos != std::string::npos) {
accountNameWithSuffixForUrl.replace(start_pos, 3, "blob");
}
}
ss << ";";
connectionString_ = ss.str();
return fmt::format(
"{}{}/{}/{}",
isHttps_ ? "https://" : "http://",
accountNameWithSuffixForUrl,
fileSystem_,
filePath_);
}

} // namespace facebook::velox::filesystems
72 changes: 64 additions & 8 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,100 @@

#pragma once

#include <azure/core/credentials/credentials.hpp>
#include <azure/storage/blobs/blob_client.hpp>
#include <azure/storage/files/datalake.hpp>
#include <folly/hash/Hash.h>
#include <string>

using namespace Azure::Storage::Blobs;
using namespace Azure::Storage::Files::DataLake;

namespace facebook::velox::config {
class ConfigBase;
}

namespace facebook::velox::filesystems {

// This is used to specify the Azurite endpoint in testing.
static std::string kAzureBlobEndpoint{"fs.azure.blob-endpoint"};
static constexpr const char* kAzureBlobEndpoint{"fs.azure.blob-endpoint"};

// The authentication mechanism is set in `fs.azure.account.auth.type` (or the
// account specific variant). The supported values are SharedKey, OAuth and SAS.
static constexpr const char* kAzureAccountAuthType =
"fs.azure.account.auth.type";

static constexpr const char* kAzureAccountKey = "fs.azure.account.key";

static constexpr const char* kAzureSASKey = "fs.azure.sas.fixed.token";
zhli1142015 marked this conversation as resolved.
Show resolved Hide resolved

static constexpr const char* kAzureAccountOAuth2ClientId =
"fs.azure.account.oauth2.client.id";

static constexpr const char* kAzureAccountOAuth2ClientSecret =
zhli1142015 marked this conversation as resolved.
Show resolved Hide resolved
"fs.azure.account.oauth2.client.secret";

// Token end point, this can be found through Azure portal. For example:
// https://login.microsoftonline.com/{TENANTID}/oauth2/token
static constexpr const char* kAzureAccountOAuth2ClientEndpoint =
"fs.azure.account.oauth2.client.endpoint";

static constexpr const char* kAzureSharedKeyAuthType = "SharedKey";

static constexpr const char* kAzureOAuthAuthType = "OAuth";

static constexpr const char* kAzureSASAuthType = "SAS";

class AbfsConfig {
public:
explicit AbfsConfig(std::string_view path, const config::ConfigBase& config);

std::string identity() const {
const auto hash = folly::Hash();
return std::to_string(hash(connectionString_));
std::unique_ptr<BlobClient> getReadFileClient();

std::unique_ptr<DataLakeFileClient> getWriteFileClient();

std::string filePath() const {
return filePath_;
}

/// Test only.
std::string fileSystem() const {
return fileSystem_;
}

/// Test only.
std::string connectionString() const {
return connectionString_;
}

std::string fileSystem() const {
return fileSystem_;
/// Test only.
std::string tenentId() const {
return tenentId_;
}

std::string filePath() const {
return filePath_;
/// Test only.
std::string authorityHost() const {
return authorityHost_;
}

private:
std::string getUrl(bool withblobSuffix);

std::string authType_;

// Container name is called FileSystem in some Azure API.
std::string fileSystem_;
std::string filePath_;
std::string connectionString_;

bool isHttps_;
std::string accountNameWithSuffix_;

std::string sas_;

std::string tenentId_;
std::string authorityHost_;
std::shared_ptr<Azure::Core::Credentials::TokenCredential> tokenCredential_;
};

} // namespace facebook::velox::filesystems
12 changes: 3 additions & 9 deletions velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"

#include <azure/storage/blobs/blob_client.hpp>
#include <fmt/format.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <glog/logging.h>
Expand All @@ -27,19 +26,16 @@
#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"

namespace facebook::velox::filesystems {
using namespace Azure::Storage::Blobs;

class AbfsReadFile::Impl {
constexpr static uint64_t kNaturalReadSize = 4 << 20; // 4M
constexpr static uint64_t kReadConcurrency = 8;

public:
explicit Impl(std::string_view path, const config::ConfigBase& config) {
auto account = AbfsConfig(path, config);
filePath_ = account.filePath();
fileClient_ =
std::make_unique<BlobClient>(BlobClient::CreateFromConnectionString(
account.connectionString(), account.fileSystem(), filePath_));
auto abfsConfig = AbfsConfig(path, config);
filePath_ = abfsConfig.filePath();
fileClient_ = abfsConfig.getReadFileClient();
}

void initialize(const FileOptions& options) {
Expand All @@ -59,7 +55,6 @@ class AbfsReadFile::Impl {
} catch (Azure::Storage::StorageException& e) {
throwStorageExceptionWithOperationDetails("GetProperties", filePath_, e);
}

VELOX_CHECK_GE(length_, 0);
}

Expand Down Expand Up @@ -141,7 +136,6 @@ class AbfsReadFile::Impl {

Azure::Storage::Blobs::DownloadBlobOptions blob;
blob.Range = range;

auto response = fileClient_->Download(blob);
response.Value.BodyStream->ReadToCount(
reinterpret_cast<uint8_t*>(position), length);
Expand Down
19 changes: 6 additions & 13 deletions velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
*/

#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"
#include <azure/storage/files/datalake.hpp>
#include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"

using namespace Azure::Storage::Files::DataLake;

namespace facebook::velox::filesystems {
class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
public:
Expand All @@ -31,7 +28,8 @@ class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
client_->Create();
}

Models::PathProperties getProperties() override {
Azure::Storage::Files::DataLake::Models::PathProperties getProperties()
override {
return client_->GetProperties().Value;
}

Expand Down Expand Up @@ -120,16 +118,11 @@ class AbfsWriteFile::Impl {
AbfsWriteFile::AbfsWriteFile(
std::string_view path,
const config::ConfigBase& config) {
auto abfsAccount = AbfsConfig(path, config);
std::unique_ptr<AzureDataLakeFileClient> client =
auto abfsConfig = AbfsConfig(path, config);
std::unique_ptr<AzureDataLakeFileClient> clientWrapper =
std::make_unique<DataLakeFileClientWrapper>(
std::make_unique<DataLakeFileClient>(
DataLakeFileClient::CreateFromConnectionString(
abfsAccount.connectionString(),
abfsAccount.fileSystem(),
abfsAccount.filePath())));

impl_ = std::make_unique<Impl>(path, client);
abfsConfig.getWriteFileClient());
impl_ = std::make_unique<Impl>(path, clientWrapper);
}

AbfsWriteFile::AbfsWriteFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ if(VELOX_ENABLE_ABFS)
velox_core
velox_hive_config
velox_dwio_common_exception
Azure::azure-identity
Azure::azure-storage-blobs
Azure::azure-storage-files-datalake
Folly::folly
Expand Down
Loading
Loading