Skip to content

Commit

Permalink
WASB Support (#344)
Browse files Browse the repository at this point in the history
* multiple prefixes

* stable tests

* autolint

* add todos

* implement todos

* autolint

* more fixes

* autolint

* fix test

* fix check

* move tenant ID to env var

* another fix

* autolint

* fix another location check (?)

* autolint

* clean up for review

* polish

* autolint

* adjustments per review

* autolint
  • Loading branch information
eric-maynard authored Oct 8, 2024
1 parent bb47225 commit 6c02252
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Locale;
import org.jetbrains.annotations.NotNull;

/**
Expand All @@ -42,7 +43,8 @@ public String getFileIoImplClassName() {

@Override
public void validatePrefixForStorageType(String loc) {
if (!loc.startsWith(getStorageType().getPrefix())
if (getStorageType().getPrefixes().stream()
.noneMatch(p -> loc.toLowerCase(Locale.ROOT).startsWith(p))
&& !loc.startsWith("file:/")
&& !loc.startsWith("/")
&& !loc.equals("*")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
package org.apache.polaris.core.storage;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.polaris.core.context.CallContext;
Expand Down Expand Up @@ -62,8 +61,7 @@ public InMemoryStorageIntegration(String identifierOrId) {
@NotNull Set<String> locations) {
// trim trailing / from allowed locations so that locations missing the trailing slash still
// match
// TODO: Canonicalize with URI and compare scheme/authority/path components separately
TreeSet<String> allowedLocations =
Set<String> allowedLocationStrings =
storageConfig.getAllowedLocations().stream()
.map(
str -> {
Expand All @@ -74,7 +72,10 @@ public InMemoryStorageIntegration(String identifierOrId) {
}
})
.map(str -> str.replace("file:///", "file:/"))
.collect(Collectors.toCollection(TreeSet::new));
.collect(Collectors.toSet());
List<StorageLocation> allowedLocations =
allowedLocationStrings.stream().map(StorageLocation::of).collect(Collectors.toList());

boolean allowWildcardLocation =
Optional.ofNullable(CallContext.getCurrentContext())
.flatMap(c -> Optional.ofNullable(c.getPolarisCallContext()))
Expand All @@ -84,7 +85,7 @@ public InMemoryStorageIntegration(String identifierOrId) {
.getConfiguration(pc, "ALLOW_WILDCARD_LOCATION", false))
.orElse(false);

if (allowWildcardLocation && allowedLocations.contains("*")) {
if (allowWildcardLocation && allowedLocationStrings.contains("*")) {
return locations.stream()
.collect(
Collectors.toMap(
Expand All @@ -100,21 +101,9 @@ public InMemoryStorageIntegration(String identifierOrId) {
}
Map<String, Map<PolarisStorageActions, ValidationResult>> resultMap = new HashMap<>();
for (String rawLocation : locations) {
String location = rawLocation.replace("file:///", "file:/");
StringBuilder builder = new StringBuilder();
NavigableSet<String> prefixes = allowedLocations;
boolean validLocation = false;
for (char c : location.toCharArray()) {
builder.append(c);
prefixes = allowedLocations.tailSet(builder.toString(), true);
if (prefixes.isEmpty()) {
break;
} else if (prefixes.first().equals(builder.toString())) {
validLocation = true;
break;
}
}
final boolean isValidLocation = validLocation;
StorageLocation storageLocation = StorageLocation.of(rawLocation);
final boolean isValidLocation =
allowedLocations.stream().anyMatch(storageLocation::isChildOf);
Map<PolarisStorageActions, ValidationResult> locationResult =
actions.stream()
.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,11 @@ private static List<String> userSpecifiedWriteLocations(Map<String, String> prop

/** Validate if the provided allowed locations are valid for the storage type */
protected void validatePrefixForStorageType(String loc) {
if (!loc.toLowerCase(Locale.ROOT).startsWith(storageType.prefix)) {
if (storageType.prefixes.stream().noneMatch(p -> loc.toLowerCase(Locale.ROOT).startsWith(p))) {
throw new IllegalArgumentException(
String.format(
"Location prefix not allowed: '%s', expected prefix: '%s'", loc, storageType.prefix));
"Location prefix not allowed: '%s', expected prefixes: '%s'",
loc, String.join(",", storageType.prefixes)));
}
}

Expand All @@ -240,19 +241,23 @@ public void validateMaxAllowedLocations(int maxAllowedLocations) {
/** Polaris' storage type, each has a fixed prefix for its location */
public enum StorageType {
S3("s3://"),
AZURE("abfs"), // abfs or abfss
AZURE(List.of("abfs://", "wasb://", "abfss://", "wasbs://")),
GCS("gs://"),
FILE("file://"),
;

final String prefix;
private final List<String> prefixes;

StorageType(String prefix) {
this.prefix = prefix;
this.prefixes = List.of(prefix);
}

public String getPrefix() {
return prefix;
StorageType(List<String> prefixes) {
this.prefixes = prefixes;
}

public List<String> getPrefixes() {
return prefixes;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.storage;

import jakarta.validation.constraints.NotNull;
import java.net.URI;
import org.apache.polaris.core.storage.azure.AzureLocation;

/** An abstraction over a storage location */
public class StorageLocation {
private final String location;

/** Create a StorageLocation from a String path */
public static StorageLocation of(String location) {
// TODO implement StorageLocation for all supported file systems and add isValidLocation
if (AzureLocation.isAzureLocation(location)) {
return new AzureLocation(location);
} else {
return new StorageLocation(location);
}
}

protected StorageLocation(@NotNull String location) {
if (location == null) {
this.location = null;
} else if (location.startsWith("file:/") && !location.startsWith("file:///")) {
this.location = URI.create(location.replaceFirst("file:/+", "file:///")).toString();
} else {
this.location = URI.create(location).toString();
}
}

/** If a path doesn't end in `/`, this will add one */
protected final String ensureTrailingSlash(String location) {
if (location == null || location.endsWith("/")) {
return location;
} else {
return location + "/";
}
}

@Override
public int hashCode() {
return location.hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof StorageLocation) {
return location.equals(((StorageLocation) obj).location);
} else {
return false;
}
}

@Override
public String toString() {
return location;
}

/**
* Returns true if this StorageLocation's location string starts with the other StorageLocation's
* location string
*/
public boolean isChildOf(StorageLocation potentialParent) {
if (this.location == null || potentialParent.location == null) {
return false;
} else {
String slashTerminatedLocation = ensureTrailingSlash(this.location);
String slashTerminatedParentLocation = ensureTrailingSlash(potentialParent.location);
return slashTerminatedLocation.startsWith(slashTerminatedParentLocation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.polaris.core.storage.StorageLocation;
import org.jetbrains.annotations.NotNull;

/** This class represents all information for a azure location */
public class AzureLocation {
/** The pattern only allows abfs[s] now because the ResovlingFileIO only accept ADLSFileIO */
private static final Pattern URI_PATTERN = Pattern.compile("^abfss?://([^/?#]+)(.*)?$");
public class AzureLocation extends StorageLocation {
private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://([^/?#]+)(.*)?$");

public static final String ADLS_ENDPOINT = "dfs.core.windows.net";

public static final String BLOB_ENDPOINT = "blob.core.windows.net";

private final String scheme;
private final String storageAccount;
private final String container;

Expand All @@ -40,16 +41,19 @@ public class AzureLocation {
/**
* Construct an Azure location object from a location uri, it should follow this pattern:
*
* <p>{@code abfs[s]://[<container>@]<storage account host>/<file path>}
* <p>{@code (abfs|wasb)[s]://[<container>@]<storage account host>/<file path>}
*
* @param location a uri
*/
public AzureLocation(@NotNull String location) {
super(location);

Matcher matcher = URI_PATTERN.matcher(location);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid azure adls location uri " + location);
throw new IllegalArgumentException("Invalid azure location uri " + location);
}
String authority = matcher.group(1);
this.scheme = matcher.group(1);
String authority = matcher.group(2);
// look for <container>@<storage account host>
String[] parts = authority.split("@", -1);

Expand All @@ -65,7 +69,7 @@ public AzureLocation(@NotNull String location) {
}
this.storageAccount = hostParts[0];
this.endpoint = hostParts[1];
String path = matcher.group(2);
String path = matcher.group(3);
filePath = path == null ? "" : path.startsWith("/") ? path.substring(1) : path;
}

Expand All @@ -88,4 +92,37 @@ public String getEndpoint() {
public String getFilePath() {
return filePath;
}

/** Get the scheme */
public String getScheme() {
return scheme;
}

/**
* Returns true if the object this StorageLocation refers to is a child of the object referred to
* by the other StorageLocation.
*/
@Override
public boolean isChildOf(@NotNull StorageLocation potentialParent) {
if (potentialParent instanceof AzureLocation) {
AzureLocation potentialAzureParent = (AzureLocation) potentialParent;
if (this.container.equals(potentialAzureParent.container)) {
if (this.storageAccount.equals(potentialAzureParent.storageAccount)) {
String slashTerminatedFilePath = ensureTrailingSlash(this.filePath);
String slashTerminatedParentFilePath = ensureTrailingSlash(potentialAzureParent.filePath);
return slashTerminatedFilePath.startsWith(slashTerminatedParentFilePath);
}
}
}
return false;
}

/** Return true if the input location appears to be an Azure path */
public static boolean isAzureLocation(String location) {
if (location == null) {
return false;
}
Matcher matcher = URI_PATTERN.matcher(location);
return matcher.matches();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void testEmptyString() {
}

@ParameterizedTest
@ValueSource(strings = {"s3", "gcs", "abfs", "file"})
@ValueSource(strings = {"s3", "gcs", "abfs", "wasb", "file"})
public void testAbsolutePaths(String scheme) {
Assertions.assertThat(StorageUtil.getBucket(scheme + "://bucket/path/file.txt"))
.isEqualTo("bucket");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,6 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() {

private static @NotNull String s3Path(
String bucket, String keyPrefix, PolarisStorageConfigurationInfo.StorageType storageType) {
return storageType.getPrefix() + bucket + "/" + keyPrefix;
return "s3://" + bucket + "/" + keyPrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ public class AzureCredentialStorageIntegrationTest {

private final String clientId = System.getenv("AZURE_CLIENT_ID");
private final String clientSecret = System.getenv("AZURE_CLIENT_SECRET");
private final String tenantId = "d479c7c9-2632-445a-b22d-7c19e68774f6";
private final String tenantId = System.getenv("AZURE_TENANT_ID");

private void assumeEnvVariablesNotNull() {
Assumptions.assumeThat(Strings.isNullOrEmpty(clientId) || Strings.isNullOrEmpty(clientSecret))
Assumptions.assumeThat(
Strings.isNullOrEmpty(clientId)
|| Strings.isNullOrEmpty(clientSecret)
|| Strings.isNullOrEmpty(tenantId))
.describedAs("Null Azure testing environment variables!")
.isFalse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.polaris.service.storage.azure;

import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.core.storage.azure.AzureLocation;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -34,12 +35,28 @@ public void testLocation() {
Assertions.assertThat(azureLocation.getFilePath()).isEqualTo("myfile");
}

@Test
public void testWasbLocation() {
String uri = "wasb://[email protected]/myfile";
AzureLocation azureLocation = new AzureLocation(uri);
Assertions.assertThat(azureLocation.getContainer()).isEqualTo("container");
Assertions.assertThat(azureLocation.getStorageAccount()).isEqualTo("storageaccount");
Assertions.assertThat(azureLocation.getEndpoint()).isEqualTo("blob.core.windows.net");
Assertions.assertThat(azureLocation.getFilePath()).isEqualTo("myfile");
}

@Test
public void testCrossSchemeComparisons() {
StorageLocation abfsLocation =
AzureLocation.of("abfss://[email protected]/some/file/x");
StorageLocation wasbLocation =
AzureLocation.of("wasb://[email protected]/some/file");
Assertions.assertThat(abfsLocation).isNotEqualTo(wasbLocation);
Assertions.assertThat(abfsLocation.isChildOf(wasbLocation)).isTrue();
}

@Test
public void testLocation_negative_cases() {
Assertions.assertThatThrownBy(
() ->
new AzureLocation("wasbs://[email protected]/myfile"))
.isInstanceOf(IllegalArgumentException.class);
Assertions.assertThatThrownBy(
() -> new AzureLocation("abfss://storageaccount.blob.core.windows.net/myfile"))
.isInstanceOf(IllegalArgumentException.class);
Expand Down
Loading

0 comments on commit 6c02252

Please sign in to comment.