Skip to content

Commit

Permalink
[apache#4993] feat(iceberg): integrate credential framework to iceber…
Browse files Browse the repository at this point in the history
…g REST server (apache#5134)

### What changes were proposed in this pull request?
integrate credential framework to iceberg REST server

### Why are the changes needed?

Fix: apache#4993 

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
set up a local environment to request credential with the configuration
`spark.sql.catalog.rest.header.X-Iceberg-Access-Delegation=vended-credentials`
  • Loading branch information
FANNG1 authored and mplmoknijb committed Nov 6, 2024
1 parent 6d3b8ab commit e0fcf3a
Show file tree
Hide file tree
Showing 23 changed files with 711 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ public class IcebergConstants {
public static final String ICEBERG_REST_CATALOG_CACHE_EVICTION_INTERVAL =
"catalog-cache-eviction-interval-ms";

public static final String ICEBERG_REST_CATALOG_PROVIDER = "catalog-provider";
public static final String ICEBERG_REST_CATALOG_CONFIG_PROVIDER = "catalog-config-provider";
public static final String STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME = "static-config-provider";
public static final String DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME =
"dynamic-config-provider";

public static final String GRAVITINO_URI = "gravitino-uri";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

public class CredentialConstants {
public static final String CREDENTIAL_PROVIDER_TYPE = "credential-provider-type";

private CredentialConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
*/
package org.apache.gravitino.catalog.lakehouse.iceberg.ops;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper.IcebergTableChange;
import org.apache.gravitino.rel.TableChange;
Expand Down Expand Up @@ -79,7 +81,7 @@ public class TestIcebergTableUpdate {

@BeforeEach
public void init() {
icebergCatalogWrapper = new IcebergCatalogWrapper();
icebergCatalogWrapper = new IcebergCatalogWrapper(new IcebergConfig(Collections.emptyMap()));
icebergCatalogWrapperHelper =
new IcebergCatalogWrapperHelper(icebergCatalogWrapper.getCatalog());
createNamespace(TEST_NAMESPACE_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import java.util.Map;

/**
* Helper class to generate specific credential properties for different table format and engine.
*/
public class CredentialPropertyUtils {
/**
* Transforms a specific credential into a map of Iceberg properties.
*
* @param credential the credential to be transformed into Iceberg properties
* @return a map of Iceberg properties derived from the credential
*/
public static Map<String, String> toIcebergProperties(Credential credential) {
// todo: transform specific credential to iceberg properties
return credential.toProperties();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CredentialProviderManager {

private static final Logger LOG = LoggerFactory.getLogger(CredentialProviderManager.class);
private Map<String, CredentialProvider> credentialProviders;

public CredentialProviderManager() {
this.credentialProviders = new ConcurrentHashMap<>();
}

public void registerCredentialProvider(
String catalogName, CredentialProvider credentialProvider) {
CredentialProvider current = credentialProviders.putIfAbsent(catalogName, credentialProvider);
Preconditions.checkState(
!credentialProvider.equals(current),
String.format(
"Should not register multiple times to CredentialProviderManager, catalog: %s, "
+ "credential provider: %s",
catalogName, credentialProvider.credentialType()));
LOG.info(
"Register catalog:%s credential provider:%s to CredentialProviderManager",
catalogName, credentialProvider.credentialType());
}

public void unregisterCredentialProvider(String catalogName) {
CredentialProvider credentialProvider = credentialProviders.remove(catalogName);
// Not all catalog has credential provider
if (credentialProvider != null) {
LOG.info(
"Unregister catalog:{} credential provider:{} to CredentialProviderManager",
catalogName,
credentialProvider.credentialType());
try {
credentialProvider.close();
} catch (IOException e) {
LOG.warn("Close credential provider failed", e);
}
}
}

@Nullable
public CredentialProvider getCredentialProvider(String catalogName) {
return credentialProviders.get(catalogName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import com.google.common.collect.ImmutableSet;
import org.apache.gravitino.utils.PrincipalUtils;

public class CredentialUtils {
public static Credential vendCredential(CredentialProvider credentialProvider, String path) {
PathBasedCredentialContext pathBasedCredentialContext =
new PathBasedCredentialContext(
PrincipalUtils.getCurrentUserName(), ImmutableSet.of(path), ImmutableSet.of());
return credentialProvider.getCredential(pathBasedCredentialContext);
}
}
1 change: 1 addition & 0 deletions iceberg/iceberg-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ plugins {
}

dependencies {
implementation(project(":api"))
implementation(project(":catalogs:catalog-common"))
implementation(project(":core")) {
exclude("*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.gravitino.storage.S3Properties;

Expand Down Expand Up @@ -201,13 +202,13 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
.longConf()
.createWithDefault(3600000L);

public static final ConfigEntry<String> ICEBERG_REST_CATALOG_PROVIDER =
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER)
public static final ConfigEntry<String> ICEBERG_REST_CATALOG_CONFIG_PROVIDER =
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER)
.doc(
"Catalog provider class name, you can develop a class that implements `IcebergCatalogWrapperProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.")
"Catalog provider class name, you can develop a class that implements `IcebergCatalogConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.createWithDefault("config-based-provider");
.createWithDefault(IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME);

public static final ConfigEntry<String> GRAVITINO_URI =
new ConfigBuilder(IcebergConstants.GRAVITINO_URI)
Expand All @@ -233,6 +234,13 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
.toSequence()
.createWithDefault(Collections.emptyList());

public static final ConfigEntry<String> CREDENTIAL_PROVIDER_TYPE =
new ConfigBuilder(CredentialConstants.CREDENTIAL_PROVIDER_TYPE)
.doc("The credential provider type for Iceberg")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public String getJdbcDriver() {
return get(JDBC_DRIVER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.gravitino.iceberg.common.ops;

import java.util.Map;
import java.util.Optional;
import org.apache.gravitino.iceberg.common.IcebergConfig;

/**
* IcebergCatalogWrapperProvider is an interface defining how Iceberg REST catalog server gets
* Iceberg catalogs.
* {@code IcebergCatalogConfigProvider} is an interface defining how Iceberg REST catalog server
* gets Iceberg catalog configurations.
*/
public interface IcebergCatalogWrapperProvider {
public interface IcebergCatalogConfigProvider {

/**
* @param properties The parameters for creating Provider which from configurations whose prefix
Expand All @@ -33,8 +35,8 @@ public interface IcebergCatalogWrapperProvider {
void initialize(Map<String, String> properties);

/**
* @param catalogName a param send by clients.
* @return the instance of IcebergCatalogWrapper.
* @param catalogName Iceberg catalog name.
* @return the configuration of Iceberg catalog.
*/
IcebergCatalogWrapper getIcebergTableOps(String catalogName);
Optional<IcebergConfig> getIcebergCatalogConfig(String catalogName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableSet;
import java.sql.Driver;
import java.sql.DriverManager;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -104,10 +103,6 @@ public IcebergCatalogWrapper(IcebergConfig icebergConfig) {
this.catalogPropertiesMap = icebergConfig.getIcebergCatalogProperties();
}

public IcebergCatalogWrapper() {
this(new IcebergConfig(Collections.emptyMap()));
}

private void validateNamespace(Optional<Namespace> namespace) {
namespace.ifPresent(
n ->
Expand Down Expand Up @@ -160,7 +155,7 @@ public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest
/**
* Reload hadoop configuration, this is useful when the hadoop configuration UserGroupInformation
* is shared by multiple threads. UserGroupInformation#authenticationMethod was first initialized
* in KerberosClient, however, when switching to iceberg-rest thead,
* in KerberosClient, however, when switching to iceberg-rest thread,
* UserGroupInformation#authenticationMethod will be reset to the default value; we need to
* reinitialize it again.
*/
Expand Down Expand Up @@ -271,7 +266,7 @@ public void close() throws Exception {
private void closeMySQLCatalogResource() {
try {
// Close thread AbandonedConnectionCleanupThread if we are using `com.mysql.cj.jdbc.Driver`,
// for driver `com.mysql.jdbc.Driver` (deprecated), the daemon thead maybe not this one.
// for driver `com.mysql.jdbc.Driver` (deprecated), the daemon thread maybe not this one.
Class.forName("com.mysql.cj.jdbc.AbandonedConnectionCleanupThread")
.getMethod("uncheckedShutdown")
.invoke(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.client.GravitinoAdminClient;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapperProvider;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,13 +40,10 @@
*
* <p>The catalogName is iceberg_catalog
*/
public class GravitinoBasedIcebergCatalogWrapperProvider
implements IcebergCatalogWrapperProvider, AutoCloseable {
public class DynamicIcebergCatalogConfigProvider
implements IcebergCatalogConfigProvider, AutoCloseable {
public static final Logger LOG =
LoggerFactory.getLogger(GravitinoBasedIcebergCatalogWrapperProvider.class);

public static final String GRAVITINO_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME =
"gravitino-based-provider";
LoggerFactory.getLogger(DynamicIcebergCatalogConfigProvider.class);

private String gravitinoMetalake;

Expand All @@ -66,22 +64,27 @@ public void initialize(Map<String, String> properties) {
}

@Override
public IcebergCatalogWrapper getIcebergTableOps(String catalogName) {
public Optional<IcebergConfig> getIcebergCatalogConfig(String catalogName) {
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogName), "blank catalogName is illegal");
Preconditions.checkArgument(
!IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName),
IcebergConstants.GRAVITINO_DEFAULT_CATALOG + " is illegal in gravitino-based-provider");

Catalog catalog = client.loadMetalake(gravitinoMetalake).loadCatalog(catalogName);
Catalog catalog;
try {
catalog = client.loadMetalake(gravitinoMetalake).loadCatalog(catalogName);
} catch (NoSuchCatalogException e) {
return Optional.empty();
}

Preconditions.checkArgument(
"lakehouse-iceberg".equals(catalog.provider()),
String.format("%s.%s is not iceberg catalog", gravitinoMetalake, catalogName));

Map<String, String> properties =
IcebergPropertiesUtils.toIcebergCatalogProperties(catalog.properties());
return new IcebergCatalogWrapper(new IcebergConfig(properties));
return Optional.of(new IcebergConfig(properties));
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit e0fcf3a

Please sign in to comment.