Skip to content

Commit fe79cc2

Browse files
pavibhaissvinarchuk
authored andcommitted
Added support for s3a scheme (apache#1932)
(cherry picked from commit a385268)
1 parent c9b064b commit fe79cc2

File tree

11 files changed

+537
-49
lines changed

11 files changed

+537
-49
lines changed

polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ protected void validatePrefixForStorageType(String loc) {
233233

234234
/** Polaris' storage type, each has a fixed prefix for its location */
235235
public enum StorageType {
236-
S3("s3://"),
236+
S3(List.of("s3://", "s3a://")),
237237
AZURE(List.of("abfs://", "wasb://", "abfss://", "wasbs://")),
238238
GCS("gs://"),
239239
FILE("file://"),

polaris-core/src/main/java/org/apache/polaris/core/storage/StorageLocation.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.net.URI;
2323
import java.util.regex.Matcher;
2424
import java.util.regex.Pattern;
25+
import org.apache.polaris.core.storage.aws.S3Location;
2526
import org.apache.polaris.core.storage.azure.AzureLocation;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ public static StorageLocation of(String location) {
4041
// TODO implement StorageLocation for all supported file systems and add isValidLocation
4142
if (AzureLocation.isAzureLocation(location)) {
4243
return new AzureLocation(location);
44+
} else if (S3Location.isS3Location(location)) {
45+
return new S3Location(location);
4346
} else {
4447
return new StorageLocation(location);
4548
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.core.storage.aws;
21+
22+
import jakarta.annotation.Nonnull;
23+
import java.util.regex.Matcher;
24+
import java.util.regex.Pattern;
25+
import org.apache.polaris.core.storage.StorageLocation;
26+
27+
public class S3Location extends StorageLocation {
28+
private static final Pattern URI_PATTERN = Pattern.compile("^(s3a?):(.+)$");
29+
private final String scheme;
30+
private final String locationWithoutScheme;
31+
32+
public S3Location(@Nonnull String location) {
33+
super(location);
34+
Matcher matcher = URI_PATTERN.matcher(location);
35+
if (!matcher.matches()) {
36+
throw new IllegalArgumentException("Invalid S3 location uri " + location);
37+
}
38+
this.scheme = matcher.group(1);
39+
this.locationWithoutScheme = matcher.group(2);
40+
}
41+
42+
public static boolean isS3Location(String location) {
43+
if (location == null) {
44+
return false;
45+
}
46+
Matcher matcher = URI_PATTERN.matcher(location);
47+
return matcher.matches();
48+
}
49+
50+
@Override
51+
public boolean isChildOf(StorageLocation potentialParent) {
52+
if (potentialParent instanceof S3Location) {
53+
S3Location that = (S3Location) potentialParent;
54+
// Given that S3 and S3A are to be treated similarly, the parent check ignores the prefix
55+
String slashTerminatedObjectKey = ensureTrailingSlash(this.locationWithoutScheme);
56+
String slashTerminatedObjectKeyThat = ensureTrailingSlash(that.locationWithoutScheme);
57+
return slashTerminatedObjectKey.startsWith(slashTerminatedObjectKeyThat);
58+
}
59+
return false;
60+
}
61+
62+
public String getScheme() {
63+
return scheme;
64+
}
65+
66+
@Override
67+
public String withoutScheme() {
68+
return locationWithoutScheme;
69+
}
70+
}

polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,42 +33,46 @@
3333
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
3434
import org.assertj.core.api.Assertions;
3535
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.params.ParameterizedTest;
37+
import org.junit.jupiter.params.provider.CsvSource;
38+
import org.junit.jupiter.params.provider.ValueSource;
3639
import org.mockito.Mockito;
3740

3841
class InMemoryStorageIntegrationTest {
3942

40-
@Test
41-
public void testValidateAccessToLocations() {
43+
@ParameterizedTest
44+
@CsvSource({"s3,s3", "s3,s3a", "s3a,s3", "s3a,s3a"})
45+
public void testValidateAccessToLocations(String allowedScheme, String locationScheme) {
4246
MockInMemoryStorageIntegration storage = new MockInMemoryStorageIntegration();
4347
Map<String, Map<PolarisStorageActions, PolarisStorageIntegration.ValidationResult>> result =
4448
storage.validateAccessToLocations(
4549
new AwsStorageConfigurationInfo(
4650
PolarisStorageConfigurationInfo.StorageType.S3,
4751
List.of(
48-
"s3://bucket/path/to/warehouse",
49-
"s3://bucket/anotherpath/to/warehouse",
50-
"s3://bucket2/warehouse/"),
52+
allowedScheme + "://bucket/path/to/warehouse",
53+
allowedScheme + "://bucket/anotherpath/to/warehouse",
54+
allowedScheme + "://bucket2/warehouse/"),
5155
"arn:aws:iam::012345678901:role/jdoe",
5256
"us-east-2"),
5357
Set.of(PolarisStorageActions.READ),
5458
Set.of(
55-
"s3://bucket/path/to/warehouse/namespace/table",
56-
"s3://bucket2/warehouse",
57-
"s3://arandombucket/path/to/warehouse/namespace/table"));
59+
locationScheme + "://bucket/path/to/warehouse/namespace/table",
60+
locationScheme + "://bucket2/warehouse",
61+
locationScheme + "://arandombucket/path/to/warehouse/namespace/table"));
5862
Assertions.assertThat(result)
5963
.hasSize(3)
6064
.containsEntry(
61-
"s3://bucket/path/to/warehouse/namespace/table",
65+
locationScheme + "://bucket/path/to/warehouse/namespace/table",
6266
Map.of(
6367
PolarisStorageActions.READ,
6468
new PolarisStorageIntegration.ValidationResult(true, "")))
6569
.containsEntry(
66-
"s3://bucket2/warehouse",
70+
locationScheme + "://bucket2/warehouse",
6771
Map.of(
6872
PolarisStorageActions.READ,
6973
new PolarisStorageIntegration.ValidationResult(true, "")))
7074
.containsEntry(
71-
"s3://arandombucket/path/to/warehouse/namespace/table",
75+
locationScheme + "://arandombucket/path/to/warehouse/namespace/table",
7276
Map.of(
7377
PolarisStorageActions.READ,
7478
new PolarisStorageIntegration.ValidationResult(false, "")));
@@ -89,8 +93,9 @@ public void testAwsAccountIdParsing() {
8993
Assertions.assertThat(actualAccountId).isEqualTo(expectedAccountId);
9094
}
9195

92-
@Test
93-
public void testValidateAccessToLocationsWithWildcard() {
96+
@ParameterizedTest
97+
@ValueSource(strings = {"s3", "s3a"})
98+
public void testValidateAccessToLocationsWithWildcard(String s3Scheme) {
9499
MockInMemoryStorageIntegration storage = new MockInMemoryStorageIntegration();
95100
Map<String, Boolean> config = Map.of("ALLOW_WILDCARD_LOCATION", true);
96101
PolarisCallContext polarisCallContext =
@@ -113,13 +118,13 @@ public void testValidateAccessToLocationsWithWildcard() {
113118
new FileStorageConfigurationInfo(List.of("file://", "*")),
114119
Set.of(PolarisStorageActions.READ),
115120
Set.of(
116-
"s3://bucket/path/to/warehouse/namespace/table",
121+
s3Scheme + "://bucket/path/to/warehouse/namespace/table",
117122
"file:///etc/passwd",
118123
"a/relative/subdirectory"));
119124
Assertions.assertThat(result)
120125
.hasSize(3)
121126
.hasEntrySatisfying(
122-
"s3://bucket/path/to/warehouse/namespace/table",
127+
s3Scheme + "://bucket/path/to/warehouse/namespace/table",
123128
val ->
124129
Assertions.assertThat(val)
125130
.hasSize(1)

polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public void testEmptyString() {
3131
}
3232

3333
@ParameterizedTest
34-
@ValueSource(strings = {"s3", "gcs", "abfs", "wasb", "file"})
34+
@ValueSource(strings = {"s3", "s3a", "gcs", "abfs", "wasb", "file"})
3535
public void testAbsolutePaths(String scheme) {
3636
Assertions.assertThat(StorageUtil.getBucket(scheme + "://bucket/path/file.txt"))
3737
.isEqualTo("bucket");
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.core.storage.aws;
21+
22+
import org.apache.polaris.core.storage.StorageLocation;
23+
import org.assertj.core.api.Assertions;
24+
import org.junit.jupiter.params.ParameterizedTest;
25+
import org.junit.jupiter.params.provider.CsvSource;
26+
import org.junit.jupiter.params.provider.ValueSource;
27+
28+
class S3LocationTest {
29+
@ParameterizedTest
30+
@ValueSource(strings = {"s3a", "s3"})
31+
public void testLocation(String scheme) {
32+
String locInput = scheme + "://bucket/schema1/table1";
33+
StorageLocation loc = StorageLocation.of(locInput);
34+
Assertions.assertThat(loc).isInstanceOf(S3Location.class);
35+
S3Location s3Loc = (S3Location) loc;
36+
Assertions.assertThat(s3Loc.getScheme()).isEqualTo(scheme);
37+
Assertions.assertThat(s3Loc.withoutScheme()).isEqualTo("//bucket/schema1/table1");
38+
Assertions.assertThat(s3Loc.withoutScheme()).doesNotStartWith(scheme);
39+
Assertions.assertThat(scheme + ":" + s3Loc.withoutScheme()).isEqualTo(locInput);
40+
}
41+
42+
@ParameterizedTest
43+
@CsvSource({"s3,s3a", "s3a,s3"})
44+
public void testPrefixValidationIgnoresScheme(String parentScheme, String childScheme) {
45+
StorageLocation loc1 = StorageLocation.of(childScheme + "://bucket/schema1/table1");
46+
StorageLocation loc2 = StorageLocation.of(parentScheme + "://bucket/schema1");
47+
Assertions.assertThat(loc1.isChildOf(loc2)).isTrue();
48+
49+
StorageLocation loc3 = StorageLocation.of(childScheme + "://bucket/schema1");
50+
Assertions.assertThat(loc2.equals(loc3)).isFalse();
51+
}
52+
}

polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest {
6464
.build();
6565
public static final String AWS_PARTITION = "aws";
6666

67-
@Test
68-
public void testGetSubscopedCreds() {
67+
@ParameterizedTest
68+
@ValueSource(strings = {"s3a", "s3"})
69+
public void testGetSubscopedCreds(String scheme) {
6970
StsClient stsClient = Mockito.mock(StsClient.class);
7071
String roleARN = "arn:aws:iam::012345678901:role/jdoe";
7172
String externalId = "externalId";
@@ -76,10 +77,13 @@ public void testGetSubscopedCreds() {
7677
.isInstanceOf(AssumeRoleRequest.class)
7778
.asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class))
7879
.returns(externalId, AssumeRoleRequest::externalId)
79-
.returns(roleARN, AssumeRoleRequest::roleArn);
80+
.returns(roleARN, AssumeRoleRequest::roleArn)
81+
// ensure that the policy content does not refer to S3A
82+
.extracting(AssumeRoleRequest::policy)
83+
.doesNotMatch(s -> s.contains("s3a"));
8084
return ASSUME_ROLE_RESPONSE;
8185
});
82-
String warehouseDir = "s3://bucket/path/to/warehouse";
86+
String warehouseDir = scheme + "://bucket/path/to/warehouse";
8387
EnumMap<StorageAccessProperty, String> credentials =
8488
new AwsCredentialsStorageIntegration(stsClient)
8589
.getSubscopedCreds(

runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,25 @@ public class PolarisOverlappingCatalogTest {
4343
TestServices.builder().config(Map.of("ALLOW_OVERLAPPING_CATALOG_URLS", "false")).build();
4444

4545
private Response createCatalog(String prefix, String defaultBaseLocation, boolean isExternal) {
46-
return createCatalog(prefix, defaultBaseLocation, isExternal, new ArrayList<String>());
46+
return createCatalog("s3", prefix, defaultBaseLocation, isExternal, new ArrayList<String>());
4747
}
4848

4949
private Response createCatalog(
50+
String s3Scheme, String prefix, String defaultBaseLocation, boolean isExternal) {
51+
return createCatalog(
52+
s3Scheme, prefix, defaultBaseLocation, isExternal, new ArrayList<String>());
53+
}
54+
55+
private Response createCatalog(
56+
String prefix,
57+
String defaultBaseLocation,
58+
boolean isExternal,
59+
List<String> allowedLocations) {
60+
return createCatalog("s3", prefix, defaultBaseLocation, isExternal, allowedLocations);
61+
}
62+
63+
private Response createCatalog(
64+
String s3Scheme,
5065
String prefix,
5166
String defaultBaseLocation,
5267
boolean isExternal,
@@ -62,15 +77,16 @@ private Response createCatalog(
6277
allowedLocations.stream()
6378
.map(
6479
l -> {
65-
return String.format("s3://bucket/%s/%s", prefix, l);
80+
return String.format(s3Scheme + "://bucket/%s/%s", prefix, l);
6681
})
6782
.toList())
6883
.build();
6984
Catalog catalog =
7085
new Catalog(
7186
isExternal ? Catalog.TypeEnum.EXTERNAL : Catalog.TypeEnum.INTERNAL,
7287
String.format("overlap_catalog_%s", uuid),
73-
new CatalogProperties(String.format("s3://bucket/%s/%s", prefix, defaultBaseLocation)),
88+
new CatalogProperties(
89+
String.format(s3Scheme + "://bucket/%s/%s", prefix, defaultBaseLocation)),
7490
System.currentTimeMillis(),
7591
System.currentTimeMillis(),
7692
1,
@@ -112,6 +128,20 @@ public void testBasicOverlappingCatalogs(boolean initiallyExternal, boolean late
112128
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
113129
}
114130

131+
@ParameterizedTest
132+
@CsvSource({"s3,s3a", "s3a,s3"})
133+
public void testBasicOverlappingCatalogWSchemeChange(String rootScheme, String overlapScheme) {
134+
String prefix = UUID.randomUUID().toString();
135+
136+
assertThat(createCatalog(rootScheme, prefix, "root", false))
137+
.returns(Response.Status.CREATED.getStatusCode(), Response::getStatus);
138+
139+
// - inside `root` but using different scheme
140+
assertThatThrownBy(() -> createCatalog(overlapScheme, prefix, "root/child", false))
141+
.isInstanceOf(ValidationException.class)
142+
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
143+
}
144+
115145
@ParameterizedTest
116146
@CsvSource({"true, true", "true, false", "false, true", "false, false"})
117147
public void testAllowedLocationOverlappingCatalogs(
@@ -146,4 +176,37 @@ public void testAllowedLocationOverlappingCatalogs(
146176
.isInstanceOf(ValidationException.class)
147177
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
148178
}
179+
180+
@ParameterizedTest
181+
@CsvSource({"s3,s3a", "s3a,s3"})
182+
public void testAllowedLocationOverlappingCatalogsWSchemeChange(
183+
String rootScheme, String overlapScheme) {
184+
String prefix = UUID.randomUUID().toString();
185+
186+
assertThat(createCatalog(rootScheme, prefix, "animals", false, Arrays.asList("dogs", "cats")))
187+
.returns(Response.Status.CREATED.getStatusCode(), Response::getStatus);
188+
189+
// This DBL overlaps with initial AL
190+
assertThatThrownBy(
191+
() ->
192+
createCatalog(
193+
overlapScheme, prefix, "dogs", false, Arrays.asList("huskies", "labs")))
194+
.isInstanceOf(ValidationException.class)
195+
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
196+
197+
// This AL overlaps with initial DBL
198+
assertThatThrownBy(
199+
() ->
200+
createCatalog(
201+
overlapScheme, prefix, "kingdoms", false, Arrays.asList("plants", "animals")))
202+
.isInstanceOf(ValidationException.class)
203+
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
204+
205+
// This AL overlaps with an initial AL
206+
assertThatThrownBy(
207+
() ->
208+
createCatalog(overlapScheme, prefix, "plays", false, Arrays.asList("rent", "cats")))
209+
.isInstanceOf(ValidationException.class)
210+
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
211+
}
149212
}

0 commit comments

Comments
 (0)