Skip to content

Commit 24683d8

Browse files
authored
Implement federation to HadoopCatalog (#1466)
* wip * quarkus fixes * autolint * hadoop impl * autolint * Refactors * refactored * autolint * add config * autolint * stable * Remove breakpoint anchor * add line to application.properties * yank HADOOP * autolint
1 parent e276791 commit 24683d8

File tree

17 files changed

+185
-17
lines changed

17 files changed

+185
-17
lines changed

polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Optional;
2323
import org.apache.polaris.core.admin.model.StorageConfigInfo;
24+
import org.apache.polaris.core.connection.ConnectionType;
2425
import org.apache.polaris.core.context.CallContext;
2526
import org.apache.polaris.core.persistence.cache.EntityWeigher;
2627

@@ -241,4 +242,11 @@ public static void enforceFeatureEnabledOrThrow(
241242
.description("If true, the policy-store endpoints are enabled")
242243
.defaultValue(true)
243244
.buildFeatureConfiguration();
245+
246+
public static final FeatureConfiguration<List<String>> SUPPORTED_CATALOG_CONNECTION_TYPES =
247+
PolarisConfiguration.<List<String>>builder()
248+
.key("SUPPORTED_CATALOG_CONNECTION_TYPES")
249+
.description("The list of supported catalog connection types for federation")
250+
.defaultValue(List.of(ConnectionType.ICEBERG_REST.name()))
251+
.buildFeatureConfiguration();
244252
}

polaris-core/src/main/java/org/apache/polaris/core/connection/AuthenticationParametersDpo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.polaris.core.admin.model.AuthenticationParameters;
2626
import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
2727
import org.apache.polaris.core.admin.model.OAuthClientCredentialsParameters;
28+
import org.apache.polaris.core.connection.iceberg.IcebergCatalogPropertiesProvider;
2829
import org.apache.polaris.core.secrets.UserSecretReference;
2930

3031
/**

polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@
3333
import java.util.Map;
3434
import org.apache.polaris.core.PolarisDiagnostics;
3535
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
36+
import org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo;
3637
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
38+
import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
39+
import org.apache.polaris.core.connection.iceberg.IcebergCatalogPropertiesProvider;
40+
import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
3741
import org.apache.polaris.core.secrets.UserSecretReference;
3842
import org.slf4j.Logger;
3943
import org.slf4j.LoggerFactory;
@@ -48,6 +52,7 @@
4852
property = "connectionTypeCode")
4953
@JsonSubTypes({
5054
@JsonSubTypes.Type(value = IcebergRestConnectionConfigInfoDpo.class, name = "1"),
55+
@JsonSubTypes.Type(value = HadoopConnectionConfigInfoDpo.class, name = "2"),
5156
})
5257
public abstract class ConnectionConfigInfoDpo implements IcebergCatalogPropertiesProvider {
5358
private static final Logger logger = LoggerFactory.getLogger(ConnectionConfigInfoDpo.class);
@@ -140,11 +145,12 @@ public static ConnectionConfigInfoDpo fromConnectionConfigInfoModelWithSecrets(
140145
ConnectionConfigInfo connectionConfigurationModel,
141146
Map<String, UserSecretReference> secretReferences) {
142147
ConnectionConfigInfoDpo config = null;
148+
final AuthenticationParametersDpo authenticationParameters;
143149
switch (connectionConfigurationModel.getConnectionType()) {
144150
case ICEBERG_REST:
145151
IcebergRestConnectionConfigInfo icebergRestConfigModel =
146152
(IcebergRestConnectionConfigInfo) connectionConfigurationModel;
147-
AuthenticationParametersDpo authenticationParameters =
153+
authenticationParameters =
148154
AuthenticationParametersDpo.fromAuthenticationParametersModelWithSecrets(
149155
icebergRestConfigModel.getAuthenticationParameters(), secretReferences);
150156
config =
@@ -153,6 +159,18 @@ public static ConnectionConfigInfoDpo fromConnectionConfigInfoModelWithSecrets(
153159
authenticationParameters,
154160
icebergRestConfigModel.getRemoteCatalogName());
155161
break;
162+
case HADOOP:
163+
HadoopConnectionConfigInfo hadoopConfigModel =
164+
(HadoopConnectionConfigInfo) connectionConfigurationModel;
165+
authenticationParameters =
166+
AuthenticationParametersDpo.fromAuthenticationParametersModelWithSecrets(
167+
hadoopConfigModel.getAuthenticationParameters(), secretReferences);
168+
config =
169+
new HadoopConnectionConfigInfoDpo(
170+
hadoopConfigModel.getUri(),
171+
authenticationParameters,
172+
hadoopConfigModel.getWarehouse());
173+
break;
156174
default:
157175
throw new IllegalStateException(
158176
"Unsupported connection type: " + connectionConfigurationModel.getConnectionType());

polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
public enum ConnectionType {
3333
NULL_TYPE(0),
3434
ICEBERG_REST(1),
35+
HADOOP(2),
3536
;
3637

3738
private static final ConnectionType[] REVERSE_MAPPING_ARRAY;
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.core.connection.hadoop;
20+
21+
import com.fasterxml.jackson.annotation.JsonProperty;
22+
import com.google.common.base.MoreObjects;
23+
import jakarta.annotation.Nonnull;
24+
import jakarta.annotation.Nullable;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import org.apache.iceberg.CatalogProperties;
28+
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
29+
import org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo;
30+
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
31+
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
32+
import org.apache.polaris.core.connection.ConnectionType;
33+
import org.apache.polaris.core.secrets.UserSecretsManager;
34+
35+
/**
36+
* The internal persistence-object counterpart to {@link
37+
* org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo} defined in the API model.
38+
*/
39+
public class HadoopConnectionConfigInfoDpo extends ConnectionConfigInfoDpo {
40+
41+
private final String warehouse;
42+
43+
public HadoopConnectionConfigInfoDpo(
44+
@JsonProperty(value = "uri", required = true) @Nonnull String uri,
45+
@JsonProperty(value = "authenticationParameters", required = true) @Nonnull
46+
AuthenticationParametersDpo authenticationParameters,
47+
@JsonProperty(value = "warehouse", required = false) @Nullable String remoteCatalogName) {
48+
super(ConnectionType.HADOOP.getCode(), uri, authenticationParameters);
49+
this.warehouse = remoteCatalogName;
50+
}
51+
52+
public String getWarehouse() {
53+
return warehouse;
54+
}
55+
56+
@Override
57+
public String toString() {
58+
return MoreObjects.toStringHelper(this)
59+
.add("connectionTypeCode", getConnectionTypeCode())
60+
.add("uri", getUri())
61+
.add("warehouse", getWarehouse())
62+
.add("authenticationParameters", getAuthenticationParameters().toString())
63+
.toString();
64+
}
65+
66+
@Override
67+
public @Nonnull Map<String, String> asIcebergCatalogProperties(
68+
UserSecretsManager secretsManager) {
69+
HashMap<String, String> properties = new HashMap<>();
70+
properties.put(CatalogProperties.URI, getUri());
71+
if (getWarehouse() != null) {
72+
properties.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouse());
73+
}
74+
properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager));
75+
return properties;
76+
}
77+
78+
@Override
79+
public ConnectionConfigInfo asConnectionConfigInfoModel() {
80+
return HadoopConnectionConfigInfo.builder()
81+
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.HADOOP)
82+
.setUri(getUri())
83+
.setWarehouse(getWarehouse())
84+
.setAuthenticationParameters(
85+
getAuthenticationParameters().asAuthenticationParametersModel())
86+
.build();
87+
}
88+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.polaris.core.connection;
19+
package org.apache.polaris.core.connection.iceberg;
2020

2121
import jakarta.annotation.Nonnull;
2222
import java.util.Map;
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.polaris.core.connection;
19+
package org.apache.polaris.core.connection.iceberg;
2020

2121
import com.fasterxml.jackson.annotation.JsonProperty;
2222
import com.google.common.base.MoreObjects;
@@ -27,6 +27,9 @@
2727
import org.apache.iceberg.CatalogProperties;
2828
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
2929
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
30+
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
31+
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
32+
import org.apache.polaris.core.connection.ConnectionType;
3033
import org.apache.polaris.core.secrets.UserSecretsManager;
3134

3235
/**

quarkus/defaults/src/main/resources/application.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ polaris.realm-context.require-header=false
111111
polaris.features.defaults."ENFORCE_PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_CHECKING"=false
112112
polaris.features.defaults."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE","FILE"]
113113
# polaris.features.defaults."ENABLE_CATALOG_FEDERATION"=true
114+
polaris.features.defaults."SUPPORTED_CATALOG_CONNECTION_TYPES"=["ICEBERG_REST"]
114115

115116
# realm overrides
116117
# polaris.features.realm-overrides."my-realm"."INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST"=true

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@
8787
import org.apache.polaris.service.config.DefaultConfigurationStore;
8888
import org.apache.polaris.service.config.RealmEntityManagerFactory;
8989
import org.apache.polaris.service.config.ReservedProperties;
90-
import org.apache.polaris.service.context.CallContextCatalogFactory;
91-
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
90+
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
91+
import org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
9292
import org.apache.polaris.service.events.PolarisEventListener;
9393
import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
9494
import org.apache.polaris.service.task.TaskExecutor;

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@
6868
import org.apache.polaris.service.catalog.iceberg.IcebergCatalogHandler;
6969
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
7070
import org.apache.polaris.service.config.RealmEntityManagerFactory;
71-
import org.apache.polaris.service.context.CallContextCatalogFactory;
72-
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
71+
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
72+
import org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
7373
import org.apache.polaris.service.http.IfNoneMatch;
7474
import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
7575
import org.apache.polaris.service.types.NotificationRequest;

0 commit comments

Comments
 (0)