From b8b61893d6d1d9020bb6c51360089e5f45c53210 Mon Sep 17 00:00:00 2001 From: yuqi Date: Thu, 13 Jun 2024 17:44:20 +0800 Subject: [PATCH 01/15] Support different level(catalog, schema,fileset) of authentication for Hadoop catalog --- catalogs/catalog-hadoop/build.gradle.kts | 3 +- .../hadoop/HadoopCatalogOperations.java | 590 ++++++++++++------ .../HadoopFilesetPropertiesMetadata.java | 9 +- .../catalog/hadoop/HadoopProxyPlugin.java | 2 +- .../HadoopSchemaPropertiesMetadata.java | 4 + .../authentication/AuthenticationConfig.java | 13 + .../kerberos/KerberosClient.java | 22 +- .../test/HadoopUserAuthenticationIT.java | 331 ++++++++++ .../gravitino/utils/PrincipalUtils.java | 13 +- 9 files changed, 772 insertions(+), 215 deletions(-) diff --git a/catalogs/catalog-hadoop/build.gradle.kts b/catalogs/catalog-hadoop/build.gradle.kts index 09f47dd075f..2ecb133244b 100644 --- a/catalogs/catalog-hadoop/build.gradle.kts +++ b/catalogs/catalog-hadoop/build.gradle.kts @@ -15,6 +15,7 @@ dependencies { implementation(project(":core")) implementation(project(":common")) + implementation(libs.cglib) implementation(libs.guava) implementation(libs.hadoop3.common) { exclude("com.sun.jersey") @@ -102,7 +103,7 @@ tasks.test { doFirst { environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12") - environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-kerberos-hive:0.1.2") + environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-kerberos-hive:test") } val init = project.extra.get("initIntegrationTest") as (Test) -> Unit diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index bf136d34edc..9ed78dec6aa 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -17,6 +17,7 @@ import com.datastrato.gravitino.StringIdentifier; import com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig; import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosClient; +import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig; import com.datastrato.gravitino.connector.CatalogInfo; import com.datastrato.gravitino.connector.CatalogOperations; import com.datastrato.gravitino.connector.HasPropertyMetadata; @@ -39,9 +40,12 @@ import com.datastrato.gravitino.utils.PrincipalUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.time.Instant; import java.util.Collections; import java.util.List; @@ -82,6 +86,12 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem private CatalogInfo catalogInfo; + private final List closeables = Lists.newArrayList(); + + private final Map userInfoMap = Maps.newConcurrentMap(); + + public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-%s"; + HadoopCatalogOperations(EntityStore store) { this.store = store; } @@ -94,6 +104,21 @@ public String getKerberosRealm() { return kerberosRealm; } + static class UserInfo { + UserGroupInformation loginUser; + boolean enableUserImpersonation; + String keytabPath; + + static UserInfo of( + UserGroupInformation loginUser, boolean enableUserImpersonation, String keytabPath) { + UserInfo userInfo = new UserInfo(); + userInfo.loginUser = loginUser; + userInfo.enableUserImpersonation = enableUserImpersonation; + userInfo.keytabPath = keytabPath; + return userInfo; + } + } + @Override public void initialize( Map config, CatalogInfo info, HasPropertyMetadata propertiesMetadata) @@ -125,20 +150,15 @@ public void initialize( private void initAuthentication(Map conf, Configuration hadoopConf) { AuthenticationConfig config = new AuthenticationConfig(conf); - String authType = config.getAuthType(); - if (StringUtils.equalsIgnoreCase(authType, AuthenticationMethod.KERBEROS.name())) { - hadoopConf.set( - HADOOP_SECURITY_AUTHENTICATION, - AuthenticationMethod.KERBEROS.name().toLowerCase(Locale.ROOT)); - UserGroupInformation.setConfiguration(hadoopConf); - try { - KerberosClient kerberosClient = new KerberosClient(conf, hadoopConf); - File keytabFile = kerberosClient.saveKeyTabFileFromUri(catalogInfo.id()); - this.kerberosRealm = kerberosClient.login(keytabFile.getAbsolutePath()); - } catch (IOException e) { - throw new RuntimeException("Failed to login with Kerberos", e); - } + if (config.isKerberosAuth()) { + String catalogKeyTablePath = String.format(GRAVITINO_KEYTAB_FORMAT, catalogInfo.id()); + this.kerberosRealm = + initKerberos( + catalogKeyTablePath, + conf, + hadoopConf, + NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name())); } } @@ -190,102 +210,121 @@ public Fileset createFileset( String storageLocation, Map properties) throws NoSuchSchemaException, FilesetAlreadyExistsException { - try { - if (store.exists(ident, Entity.EntityType.FILESET)) { - throw new FilesetAlreadyExistsException("Fileset %s already exists", ident); - } - } catch (IOException ioe) { - throw new RuntimeException("Failed to check if fileset " + ident + " exists", ioe); - } - - SchemaEntity schemaEntity; - NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); - try { - schemaEntity = store.get(schemaIdent, Entity.EntityType.SCHEMA, SchemaEntity.class); - } catch (NoSuchEntityException exception) { - throw new NoSuchSchemaException(exception, SCHEMA_DOES_NOT_EXIST_MSG, schemaIdent); - } catch (IOException ioe) { - throw new RuntimeException("Failed to load schema " + schemaIdent, ioe); - } - - // For external fileset, the storageLocation must be set. - if (type == Fileset.Type.EXTERNAL && StringUtils.isBlank(storageLocation)) { - throw new IllegalArgumentException( - "Storage location must be set for external fileset " + ident); - } - - // Either catalog property "location", or schema property "location", or storageLocation must be - // set for managed fileset. - Path schemaPath = getSchemaPath(schemaIdent.name(), schemaEntity.properties()); - if (schemaPath == null && StringUtils.isBlank(storageLocation)) { - throw new IllegalArgumentException( - "Storage location must be set for fileset " - + ident - + " when it's catalog and schema location are not set"); - } - - // The specified storageLocation will take precedence over the calculated one. - Path filesetPath = - StringUtils.isNotBlank(storageLocation) - ? new Path(storageLocation) - : new Path(schemaPath, ident.name()); - - try { - // formalize the path to avoid path without scheme, uri, authority, etc. - filesetPath = formalizePath(filesetPath, hadoopConf); - FileSystem fs = filesetPath.getFileSystem(hadoopConf); - if (!fs.exists(filesetPath)) { - if (!fs.mkdirs(filesetPath)) { - throw new RuntimeException( - "Failed to create fileset " + ident + " location " + filesetPath); - } - - LOG.info("Created fileset {} location {}", ident, filesetPath); - } else { - LOG.info("Fileset {} manages the existing location {}", ident, filesetPath); - } - - } catch (IOException ioe) { - throw new RuntimeException( - "Failed to create fileset " + ident + " location " + filesetPath, ioe); - } - - StringIdentifier stringId = StringIdentifier.fromProperties(properties); - Preconditions.checkArgument(stringId != null, "Property String identifier should not be null"); - - FilesetEntity filesetEntity = - FilesetEntity.builder() - .withName(ident.name()) - .withId(stringId.id()) - .withNamespace(ident.namespace()) - .withComment(comment) - .withFilesetType(type) - // Store the storageLocation to the store. If the "storageLocation" is null for - // managed fileset, Gravitino will get and store the location based on the - // catalog/schema's location and store it to the store. - .withStorageLocation(filesetPath.toString()) - .withProperties(properties) - .withAuditInfo( - AuditInfo.builder() - .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) - .withCreateTime(Instant.now()) - .build()) - .build(); + // Reset the current user based on the name identifier. + UserGroupInformation currentUser = getCurrentUser(properties, ident, true); try { - store.put(filesetEntity, true /* overwrite */); - } catch (IOException ioe) { - throw new RuntimeException("Failed to create fileset " + ident, ioe); + return currentUser.doAs( + (PrivilegedExceptionAction) + () -> { + try { + if (store.exists(ident, Entity.EntityType.FILESET)) { + throw new FilesetAlreadyExistsException("Fileset %s already exists", ident); + } + } catch (IOException ioe) { + throw new RuntimeException( + "Failed to check if fileset " + ident + " exists", ioe); + } + + SchemaEntity schemaEntity; + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + try { + schemaEntity = + store.get(schemaIdent, Entity.EntityType.SCHEMA, SchemaEntity.class); + } catch (NoSuchEntityException exception) { + throw new NoSuchSchemaException( + exception, SCHEMA_DOES_NOT_EXIST_MSG, schemaIdent); + } catch (IOException ioe) { + throw new RuntimeException("Failed to load schema " + schemaIdent, ioe); + } + + // For external fileset, the storageLocation must be set. + if (type == Fileset.Type.EXTERNAL && StringUtils.isBlank(storageLocation)) { + throw new IllegalArgumentException( + "Storage location must be set for external fileset " + ident); + } + + // Either catalog property "location", or schema property "location", or + // storageLocation must be + // set for managed fileset. + Path schemaPath = getSchemaPath(schemaIdent.name(), schemaEntity.properties()); + if (schemaPath == null && StringUtils.isBlank(storageLocation)) { + throw new IllegalArgumentException( + "Storage location must be set for fileset " + + ident + + " when it's catalog and schema location are not set"); + } + + // The specified storageLocation will take precedence over the calculated one. + Path filesetPath = + StringUtils.isNotBlank(storageLocation) + ? new Path(storageLocation) + : new Path(schemaPath, ident.name()); + + try { + // formalize the path to avoid path without scheme, uri, authority, etc. + filesetPath = formalizePath(filesetPath, hadoopConf); + FileSystem fs = filesetPath.getFileSystem(hadoopConf); + if (!fs.exists(filesetPath)) { + if (!fs.mkdirs(filesetPath)) { + throw new RuntimeException( + "Failed to create fileset " + ident + " location " + filesetPath); + } + + LOG.info("Created fileset {} location {}", ident, filesetPath); + } else { + LOG.info("Fileset {} manages the existing location {}", ident, filesetPath); + } + + } catch (IOException ioe) { + throw new RuntimeException( + "Failed to create fileset " + ident + " location " + filesetPath, ioe); + } + + StringIdentifier stringId = StringIdentifier.fromProperties(properties); + Preconditions.checkArgument( + stringId != null, "Property String identifier should not be null"); + + FilesetEntity filesetEntity = + FilesetEntity.builder() + .withName(ident.name()) + .withId(stringId.id()) + .withNamespace(ident.namespace()) + .withComment(comment) + .withFilesetType(type) + // Store the storageLocation to the store. If the "storageLocation" is null + // for + // managed fileset, Gravitino will get and store the location based on the + // catalog/schema's location and store it to the store. + .withStorageLocation(filesetPath.toString()) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + + try { + store.put(filesetEntity, true /* overwrite */); + } catch (IOException ioe) { + throw new RuntimeException("Failed to create fileset " + ident, ioe); + } + + return HadoopFileset.builder() + .withName(ident.name()) + .withComment(comment) + .withType(type) + .withStorageLocation(filesetPath.toString()) + .withProperties(filesetEntity.properties()) + .withAuditInfo(filesetEntity.auditInfo()) + .build(); + }); + } catch (InterruptedException e) { + throw new RuntimeException("Failed to create fileset " + ident, e); + } catch (IOException e) { + throw new RuntimeException(e); } - - return HadoopFileset.builder() - .withName(ident.name()) - .withComment(comment) - .withType(type) - .withStorageLocation(filesetPath.toString()) - .withProperties(filesetEntity.properties()) - .withAuditInfo(filesetEntity.auditInfo()) - .build(); } @Override @@ -329,31 +368,41 @@ public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes) @Override public boolean dropFileset(NameIdentifier ident) { + try { FilesetEntity filesetEntity = store.get(ident, Entity.EntityType.FILESET, FilesetEntity.class); Path filesetPath = new Path(filesetEntity.storageLocation()); - // For managed fileset, we should delete the related files. - if (filesetEntity.filesetType() == Fileset.Type.MANAGED) { - FileSystem fs = filesetPath.getFileSystem(hadoopConf); - if (fs.exists(filesetPath)) { - if (!fs.delete(filesetPath, true)) { - LOG.warn("Failed to delete fileset {} location {}", ident, filesetPath); - return false; - } - - } else { - LOG.warn("Fileset {} location {} does not exist", ident, filesetPath); - } - } - - return store.delete(ident, Entity.EntityType.FILESET); + // Reset the current user based on the name identifier. + UserGroupInformation currentUser = getCurrentUser(filesetEntity.properties(), ident, true); + + return currentUser.doAs( + (PrivilegedExceptionAction) + () -> { + // For managed fileset, we should delete the related files. + if (filesetEntity.filesetType() == Fileset.Type.MANAGED) { + FileSystem fs = filesetPath.getFileSystem(hadoopConf); + if (fs.exists(filesetPath)) { + if (!fs.delete(filesetPath, true)) { + LOG.warn("Failed to delete fileset {} location {}", ident, filesetPath); + return false; + } + + } else { + LOG.warn("Fileset {} location {} does not exist", ident, filesetPath); + } + } + + boolean success = store.delete(ident, Entity.EntityType.FILESET); + cleanUserInfo(ident); + return success; + }); } catch (NoSuchEntityException ne) { LOG.warn("Fileset {} does not exist", ident); return false; - } catch (IOException ioe) { - throw new RuntimeException("Failed to delete fileset " + ident, ioe); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("Failed to delete fileset " + ident, e); } } @@ -370,66 +419,114 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc } } - @Override - public Schema createSchema(NameIdentifier ident, String comment, Map properties) - throws NoSuchCatalogException, SchemaAlreadyExistsException { - try { - if (store.exists(ident, Entity.EntityType.SCHEMA)) { - throw new SchemaAlreadyExistsException("Schema %s already exists", ident); - } - } catch (IOException ioe) { - throw new RuntimeException("Failed to check if schema " + ident + " exists", ioe); - } + public String initKerberos( + String keytabPath, + Map properties, + Configuration configuration, + NameIdentifier ident) { + // Init schema level kerberos authentication. + KerberosConfig kerberosConfig = new KerberosConfig(properties); + if (kerberosConfig.isKerberosAuth()) { + configuration.set( + HADOOP_SECURITY_AUTHENTICATION, + AuthenticationMethod.KERBEROS.name().toLowerCase(Locale.ROOT)); + UserGroupInformation.setConfiguration(configuration); - Path schemaPath = getSchemaPath(ident.name(), properties); - if (schemaPath != null) { try { - FileSystem fs = schemaPath.getFileSystem(hadoopConf); - if (!fs.exists(schemaPath)) { - if (!fs.mkdirs(schemaPath)) { - // Fail the operation when failed to create the schema path. - throw new RuntimeException( - "Failed to create schema " + ident + " location " + schemaPath); - } - LOG.info("Created schema {} location {}", ident, schemaPath); - } else { - LOG.info("Schema {} manages the existing location {}", ident, schemaPath); - } - - } catch (IOException ioe) { - throw new RuntimeException( - "Failed to create schema " + ident + " location " + schemaPath, ioe); + KerberosClient kerberosClient = new KerberosClient(properties, configuration); + // Add the kerberos client to closable to close resources. + closeables.add(kerberosClient); + + File keytabFile = kerberosClient.saveKeyTabFileFromUri(keytabPath); + kerberosRealm = kerberosClient.login(keytabFile.getAbsolutePath()); + // Should this kerberosRealm need to be equals to the realm in the principal? + userInfoMap.put( + ident, + UserInfo.of( + UserGroupInformation.getLoginUser(), + kerberosConfig.isImpersonationEnabled(), + keytabPath)); + return kerberosRealm; + } catch (IOException e) { + throw new RuntimeException("Failed to login with Kerberos", e); } } - StringIdentifier stringId = StringIdentifier.fromProperties(properties); - Preconditions.checkNotNull(stringId, "Property String identifier should not be null"); - - SchemaEntity schemaEntity = - SchemaEntity.builder() - .withName(ident.name()) - .withId(stringId.id()) - .withNamespace(ident.namespace()) - .withComment(comment) - .withProperties(properties) - .withAuditInfo( - AuditInfo.builder() - .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) - .withCreateTime(Instant.now()) - .build()) - .build(); + return null; + } + + @Override + public Schema createSchema(NameIdentifier ident, String comment, Map properties) + throws NoSuchCatalogException, SchemaAlreadyExistsException { + + // Reset the current user based on the name identifier and properties. + UserGroupInformation currentUser = getCurrentUser(properties, ident, true); try { - store.put(schemaEntity, true /* overwrite */); - } catch (IOException ioe) { - throw new RuntimeException("Failed to create schema " + ident, ioe); + return currentUser.doAs( + (PrivilegedExceptionAction) + () -> { + try { + if (store.exists(ident, Entity.EntityType.SCHEMA)) { + throw new SchemaAlreadyExistsException("Schema %s already exists", ident); + } + } catch (IOException ioe) { + throw new RuntimeException("Failed to check if schema " + ident + " exists", ioe); + } + + Path schemaPath = getSchemaPath(ident.name(), properties); + if (schemaPath != null) { + try { + FileSystem fs = schemaPath.getFileSystem(hadoopConf); + if (!fs.exists(schemaPath)) { + if (!fs.mkdirs(schemaPath)) { + // Fail the operation when failed to create the schema path. + throw new RuntimeException( + "Failed to create schema " + ident + " location " + schemaPath); + } + LOG.info("Created schema {} location {}", ident, schemaPath); + } else { + LOG.info("Schema {} manages the existing location {}", ident, schemaPath); + } + + } catch (IOException ioe) { + throw new RuntimeException( + "Failed to create schema " + ident + " location " + schemaPath, ioe); + } + } + + StringIdentifier stringId = StringIdentifier.fromProperties(properties); + Preconditions.checkNotNull( + stringId, "Property String identifier should not be null"); + + SchemaEntity schemaEntity = + SchemaEntity.builder() + .withName(ident.name()) + .withId(stringId.id()) + .withNamespace(ident.namespace()) + .withComment(comment) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + try { + store.put(schemaEntity, true /* overwrite */); + } catch (IOException ioe) { + throw new RuntimeException("Failed to create schema " + ident, ioe); + } + + return HadoopSchema.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(schemaEntity.properties()) + .withAuditInfo(schemaEntity.auditInfo()) + .build(); + }); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("Failed to create schema " + ident, e); } - - return HadoopSchema.builder() - .withName(ident.name()) - .withComment(comment) - .withProperties(schemaEntity.properties()) - .withAuditInfo(schemaEntity.auditInfo()) - .build(); } @Override @@ -497,35 +594,52 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty Map properties = Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap()); - Path schemaPath = getSchemaPath(ident.name(), properties); - // Nothing to delete if the schema path is not set. - if (schemaPath == null) { - return false; - } - - FileSystem fs = schemaPath.getFileSystem(hadoopConf); - // Nothing to delete if the schema path does not exist. - if (!fs.exists(schemaPath)) { - return false; - } - - if (fs.listStatus(schemaPath).length > 0 && !cascade) { - throw new NonEmptySchemaException( - "Schema %s with location %s is not empty", ident, schemaPath); - } else { - fs.delete(schemaPath, true); - } - - LOG.info("Deleted schema {} location {}", ident, schemaPath); - return true; - - } catch (IOException ioe) { - throw new RuntimeException("Failed to delete schema " + ident + " location", ioe); + // Reset current user based on the name identifier. + UserGroupInformation user = getCurrentUser(schemaEntity.properties(), ident, true); + + return user.doAs( + (PrivilegedExceptionAction) + () -> { + Path schemaPath = getSchemaPath(ident.name(), properties); + // Nothing to delete if the schema path is not set. + if (schemaPath == null) { + return false; + } + + FileSystem fs = schemaPath.getFileSystem(hadoopConf); + // Nothing to delete if the schema path does not exist. + if (!fs.exists(schemaPath)) { + return false; + } + + if (fs.listStatus(schemaPath).length > 0 && !cascade) { + throw new NonEmptySchemaException( + "Schema %s with location %s is not empty", ident, schemaPath); + } else { + fs.delete(schemaPath, true); + } + + cleanUserInfo(ident); + LOG.info("Deleted schema {} location {}", ident, schemaPath); + return true; + }); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("Failed to delete schema " + ident + " location", e); } } @Override - public void close() throws IOException {} + public void close() throws IOException { + userInfoMap.clear(); + closeables.forEach( + c -> { + try { + c.close(); + } catch (IOException e) { + LOG.error("Failed to close resource", e); + } + }); + } private SchemaEntity updateSchemaEntity( NameIdentifier ident, SchemaEntity schemaEntity, SchemaChange... changes) { @@ -628,4 +742,78 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) { this.proxyPlugin = hadoopProxyPlugin; } + + public UserGroupInformation getCurrentUser( + Map properties, NameIdentifier ident, boolean needRelogin) { + KerberosConfig kerberosConfig = new KerberosConfig(properties); + if (kerberosConfig.isKerberosAuth() && needRelogin) { + // We assume that the realm of catalog is the same as the realm of the schema and table. + String keytabPath = + String.format( + GRAVITINO_KEYTAB_FORMAT, catalogInfo.id() + "-" + ident.toString().replace(".", "-")); + initKerberos(keytabPath, properties, new Configuration(), ident); + } + + return getUserBaseOnNameIdentifier(ident); + } + + private UserGroupInformation getUserBaseOnNameIdentifier(NameIdentifier nameIdentifier) { + UserInfo userInfo = getNearestUserGroupInformation(nameIdentifier); + if (userInfo == null) { + try { + return UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new IllegalStateException("Fail to get currentUser", e); + } + } + + UserGroupInformation ugi = userInfo.loginUser; + boolean userImpersonation = userInfo.enableUserImpersonation; + if (userImpersonation) { + String proxyKerberosPrincipalName = PrincipalUtils.getCurrentUserName(); + if (!proxyKerberosPrincipalName.contains("@")) { + proxyKerberosPrincipalName = + String.format("%s@%s", proxyKerberosPrincipalName, kerberosRealm); + } + + ugi = UserGroupInformation.createProxyUser(proxyKerberosPrincipalName, ugi); + } + + return ugi; + } + + private UserInfo getNearestUserGroupInformation(NameIdentifier nameIdentifier) { + NameIdentifier currentNameIdentifier = nameIdentifier; + while (currentNameIdentifier != null) { + if (userInfoMap.containsKey(currentNameIdentifier)) { + return userInfoMap.get(currentNameIdentifier); + } + + String[] levels = currentNameIdentifier.namespace().levels(); + // The ident is catalog level. + if (levels.length <= 1) { + return null; + } + currentNameIdentifier = NameIdentifier.of(currentNameIdentifier.namespace().levels()); + } + return null; + } + + private void cleanUserInfo(NameIdentifier identifier) { + UserInfo userInfo = userInfoMap.remove(identifier); + if (userInfo != null) { + removeFile(userInfo.keytabPath); + } + } + + private void removeFile(String filePath) { + if (filePath == null) { + return; + } + + File file = new File(filePath); + if (file.exists()) { + file.delete(); + } + } } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java index 218d69b30f6..4ad31d1dcb5 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java @@ -4,15 +4,20 @@ */ package com.datastrato.gravitino.catalog.hadoop; +import com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig; +import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig; import com.datastrato.gravitino.connector.BasePropertiesMetadata; import com.datastrato.gravitino.connector.PropertyEntry; -import java.util.Collections; +import com.google.common.collect.ImmutableMap; import java.util.Map; public class HadoopFilesetPropertiesMetadata extends BasePropertiesMetadata { @Override protected Map> specificPropertyEntries() { - return Collections.emptyMap(); + ImmutableMap.Builder> builder = ImmutableMap.builder(); + builder.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES); + builder.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES); + return builder.build(); } } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java index 471f6a165cd..72f8a580520 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java @@ -19,7 +19,7 @@ public class HadoopProxyPlugin implements ProxyPlugin { private HadoopCatalogOperations ops; - private UserGroupInformation realUser; + private final UserGroupInformation realUser; public HadoopProxyPlugin() { try { diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java index 5cf23913a69..00f6cfb1f11 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java @@ -4,6 +4,8 @@ */ package com.datastrato.gravitino.catalog.hadoop; +import com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig; +import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig; import com.datastrato.gravitino.connector.BasePropertiesMetadata; import com.datastrato.gravitino.connector.PropertyEntry; import com.google.common.collect.ImmutableMap; @@ -31,6 +33,8 @@ public class HadoopSchemaPropertiesMetadata extends BasePropertiesMetadata { true /* immutable */, null, false /* hidden */)) + .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES) + .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES) .build(); @Override diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java index 7655623462d..b39446ef0ac 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java @@ -21,6 +21,11 @@ public class AuthenticationConfig extends Config { public static final String IMPERSONATION_ENABLE_KEY = "authentication.impersonation-enable"; + enum AuthenticationType { + SIMPLE, + KERBEROS + } + public AuthenticationConfig(Map properties) { super(false); loadFromMap(properties, k -> true); @@ -49,6 +54,14 @@ public boolean isImpersonationEnabled() { return get(ENABLE_IMPERSONATION_ENTRY); } + public boolean isSimpleAuth() { + return AuthenticationType.SIMPLE.name().equalsIgnoreCase(getAuthType()); + } + + public boolean isKerberosAuth() { + return AuthenticationType.KERBEROS.name().equalsIgnoreCase(getAuthType()); + } + public static final Map> AUTHENTICATION_PROPERTY_ENTRIES = new ImmutableMap.Builder>() .put( diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java index 3930380e876..27e3da89450 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java @@ -8,6 +8,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.List; @@ -22,11 +23,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KerberosClient { +public class KerberosClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); - public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-%s-keytab"; - private final ScheduledThreadPoolExecutor checkTgtExecutor; private final Map conf; private final Configuration hadoopConf; @@ -52,8 +51,9 @@ public String login(String keytabFilePath) throws IOException { // Login UserGroupInformation.setConfiguration(hadoopConf); KerberosName.resetDefaultRealm(); - UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); - UserGroupInformation kerberosLoginUgi = UserGroupInformation.getCurrentUser(); + UserGroupInformation kerberosLoginUgi = + UserGroupInformation.loginUserFromKeytabAndReturnUGI(catalogPrincipal, keytabFilePath); + UserGroupInformation.setLoginUser(kerberosLoginUgi); // Refresh the cache if it's out of date. int checkInterval = kerberosConfig.getCheckIntervalSec(); @@ -72,8 +72,7 @@ public String login(String keytabFilePath) throws IOException { return principalComponents.get(1); } - public File saveKeyTabFileFromUri(Long catalogId) throws IOException { - + public File saveKeyTabFileFromUri(String keytabPath) throws IOException { KerberosConfig kerberosConfig = new KerberosConfig(conf); String keyTabUri = kerberosConfig.getKeytab(); @@ -89,7 +88,7 @@ public File saveKeyTabFileFromUri(Long catalogId) throws IOException { keytabsDir.mkdir(); } - File keytabFile = new File(String.format(GRAVITINO_KEYTAB_FORMAT, catalogId)); + File keytabFile = new File(keytabPath); keytabFile.deleteOnExit(); if (keytabFile.exists() && !keytabFile.delete()) { throw new IllegalStateException( @@ -105,4 +104,11 @@ public File saveKeyTabFileFromUri(Long catalogId) throws IOException { private static ThreadFactory getThreadFactory(String factoryName) { return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + "-%d").build(); } + + @Override + public void close() throws IOException { + if (checkTgtExecutor != null) { + checkTgtExecutor.shutdown(); + } + } } diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java index e35902145b5..0756b628b89 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -38,6 +40,7 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.utility.MountableFile; import sun.security.krb5.KrbException; @Tag("gravitino-docker-it") @@ -58,6 +61,15 @@ public class HadoopUserAuthenticationIT extends AbstractIT { private static final String HADOOP_CLIENT_PRINCIPAL = "cli@HADOOPKRB"; private static final String HADOOP_CLIENT_KEYTAB = "/client.keytab"; + private static final String HADOOP_SCHEMA_PRINCIPAL = "cli_schema"; + private static final String HADOOP_FILESET_PRINCIPAL = "cli_fileset"; + + private static final String HADOOP_SCHEMA_KEYTAB = "/cli_schema.keytab"; + private static final String HADOOP_FILESET_KEYTAB = "/cli_fileset.keytab"; + + private static final String REALM = "HADOOPKRB"; + private static final String ADMIN_PASSWORD = "Admin12!"; + private static String TMP_DIR; private static String HDFS_URL; @@ -113,6 +125,8 @@ private static void prepareKerberosConfig() throws IOException, KrbException { .getContainer() .copyFileFromContainer("/gravitino_client.keytab", TMP_DIR + GRAVITINO_CLIENT_KEYTAB); + createKeyTableForSchemaAndFileset(); + // Keytab of the Gravitino server kerberosHiveContainer .getContainer() @@ -139,6 +153,59 @@ private static void prepareKerberosConfig() throws IOException, KrbException { System.setProperty("sun.security.krb5.debug", "true"); } + private static void createKeyTableForSchemaAndFileset() throws IOException { + String shellContent = + "echo -e \"%s\\n%s\" | kadmin.local -q \"addprinc %s@%s\"" + + "\n" + + "kadmin.local -q \"xst -k /%s.keytab -norandkey %s@%s\""; + + String createSchemaShellFile = String.format("/%s.sh", HADOOP_SCHEMA_PRINCIPAL); + String createFileSetShellFile = String.format("/%s.sh", HADOOP_FILESET_PRINCIPAL); + + FileUtils.writeStringToFile( + Paths.get(TMP_DIR + createSchemaShellFile).toFile(), + String.format( + shellContent, + ADMIN_PASSWORD, + ADMIN_PASSWORD, + HADOOP_SCHEMA_PRINCIPAL, + REALM, + HADOOP_SCHEMA_PRINCIPAL, + HADOOP_SCHEMA_PRINCIPAL, + REALM), + StandardCharsets.UTF_8); + kerberosHiveContainer + .getContainer() + .copyFileToContainer( + MountableFile.forHostPath(TMP_DIR + createSchemaShellFile), createSchemaShellFile); + kerberosHiveContainer.executeInContainer("bash", createSchemaShellFile); + + FileUtils.writeStringToFile( + Paths.get(TMP_DIR + createFileSetShellFile).toFile(), + String.format( + shellContent, + ADMIN_PASSWORD, + ADMIN_PASSWORD, + HADOOP_FILESET_PRINCIPAL, + REALM, + HADOOP_FILESET_PRINCIPAL, + HADOOP_FILESET_PRINCIPAL, + REALM), + StandardCharsets.UTF_8); + kerberosHiveContainer + .getContainer() + .copyFileToContainer( + MountableFile.forHostPath(TMP_DIR + createFileSetShellFile), createFileSetShellFile); + kerberosHiveContainer.executeInContainer("bash", createFileSetShellFile); + + kerberosHiveContainer + .getContainer() + .copyFileFromContainer(HADOOP_SCHEMA_KEYTAB, TMP_DIR + HADOOP_SCHEMA_KEYTAB); + kerberosHiveContainer + .getContainer() + .copyFileFromContainer(HADOOP_FILESET_KEYTAB, TMP_DIR + HADOOP_FILESET_KEYTAB); + } + private static void addKerberosConfig() { AbstractIT.customConfigs.put("gravitino.authenticator", "kerberos"); AbstractIT.customConfigs.put( @@ -211,4 +278,268 @@ public void testUserAuthentication() { catalog.asSchemas().dropSchema(SCHEMA_NAME, true); } + + @Test + void testCreateSchemaWithKerberos() { + KerberosTokenProvider provider = + KerberosTokenProvider.builder() + .withClientPrincipal(GRAVITINO_CLIENT_PRINCIPAL) + .withKeyTabFile(new File(TMP_DIR + GRAVITINO_CLIENT_KEYTAB)) + .build(); + adminClient = GravitinoAdminClient.builder(serverUri).withKerberosAuth(provider).build(); + + String metalakeName = GravitinoITUtils.genRandomName("metalake"); + String catalogName = GravitinoITUtils.genRandomName("catalog"); + GravitinoMetalake gravitinoMetalake = + adminClient.createMetalake(metalakeName, null, ImmutableMap.of()); + + // Create a catalog + Map properties = Maps.newHashMap(); + String location = HDFS_URL + "/user/hadoop/" + catalogName; + + properties.put(AUTH_TYPE_KEY, "kerberos"); + properties.put(IMPERSONATION_ENABLE_KEY, "true"); + properties.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_CLIENT_KEYTAB); + properties.put(PRINCIPAL_KEY, HADOOP_CLIENT_PRINCIPAL); + properties.put("location", location); + + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-mkdir", "-p", "/user/hadoop/" + catalogName); + + Catalog catalog = + gravitinoMetalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "comment", properties); + + // Test create schema + Exception exception = + Assertions.assertThrows( + Exception.class, + () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", ImmutableMap.of())); + String exceptionMessage = Throwables.getStackTraceAsString(exception); + // Make sure real user is 'gravitino_client' + Assertions.assertTrue( + exceptionMessage.contains("Permission denied: user=gravitino_client, access=WRITE")); + + Map schemaProperty = new HashMap<>(); + schemaProperty.put(AUTH_TYPE_KEY, "kerberos"); + // Disable impersonation here, so the user is the same as the principal + schemaProperty.put(IMPERSONATION_ENABLE_KEY, "false"); + schemaProperty.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_SCHEMA_KEYTAB); + schemaProperty.put(PRINCIPAL_KEY, HADOOP_SCHEMA_PRINCIPAL + "@" + REALM); + + exception = + Assertions.assertThrows( + Exception.class, + () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", schemaProperty)); + exceptionMessage = Throwables.getStackTraceAsString(exception); + // Make sure real user is 'cli_schema' + Assertions.assertTrue( + exceptionMessage.contains("Permission denied: user=cli_schema, access=WRITE")); + + // enable user impersonation, so the real user is gravitino_client + schemaProperty.put(IMPERSONATION_ENABLE_KEY, "true"); + exception = + Assertions.assertThrows( + Exception.class, + () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", schemaProperty)); + exceptionMessage = Throwables.getStackTraceAsString(exception); + // Make sure real user is 'gravitino_client' if user impersonation enabled. + Assertions.assertTrue( + exceptionMessage.contains("Permission denied: user=gravitino_client, access=WRITE")); + + // Now try to give the user the permission to create schema again + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-chown", "gravitino_client", "/user/hadoop/" + catalogName); + Assertions.assertDoesNotThrow( + () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", schemaProperty)); + + // Disable impersonation here, so the user is the same as the principal 'cli_schema' + schemaProperty.put(IMPERSONATION_ENABLE_KEY, "false"); + exception = + Assertions.assertThrows( + Exception.class, + () -> + catalog.asSchemas().createSchema(SCHEMA_NAME + "_new", "comment", schemaProperty)); + exceptionMessage = Throwables.getStackTraceAsString(exception); + Assertions.assertTrue( + exceptionMessage.contains("Permission denied: user=cli_schema, access=WRITE")); + + // END of test schema creation + Assertions.assertDoesNotThrow(() -> catalog.asSchemas().dropSchema(SCHEMA_NAME, true)); + + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-chown", "cli_schema", "/user/hadoop/" + catalogName); + Assertions.assertDoesNotThrow( + () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", schemaProperty)); + + catalog + .asFilesetCatalog() + .createFileset( + NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, TABLE_NAME), + "comment", + Fileset.Type.MANAGED, + null, + ImmutableMap.of()); + } + + @Test + void createTableWithKerberos() { + KerberosTokenProvider provider = + KerberosTokenProvider.builder() + .withClientPrincipal(GRAVITINO_CLIENT_PRINCIPAL) + .withKeyTabFile(new File(TMP_DIR + GRAVITINO_CLIENT_KEYTAB)) + .build(); + adminClient = GravitinoAdminClient.builder(serverUri).withKerberosAuth(provider).build(); + + String metalakeName = GravitinoITUtils.genRandomName("metalake"); + String catalogName = GravitinoITUtils.genRandomName("catalog"); + GravitinoMetalake gravitinoMetalake = + adminClient.createMetalake(metalakeName, null, ImmutableMap.of()); + + // Create a catalog + Map properties = Maps.newHashMap(); + String localtion = HDFS_URL + "/user/hadoop/" + catalogName; + + properties.put(AUTH_TYPE_KEY, "kerberos"); + properties.put(IMPERSONATION_ENABLE_KEY, "true"); + properties.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_CLIENT_KEYTAB); + properties.put(PRINCIPAL_KEY, HADOOP_CLIENT_PRINCIPAL); + properties.put("location", localtion); + + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-mkdir", "-p", "/user/hadoop/" + catalogName); + + Catalog catalog = + gravitinoMetalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "comment", properties); + + Map schemaProperty = new HashMap<>(); + schemaProperty.put(AUTH_TYPE_KEY, "kerberos"); + // Disable impersonation here, so the user is the same as the principal as 'cli_schema' + schemaProperty.put(IMPERSONATION_ENABLE_KEY, "false"); + schemaProperty.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_SCHEMA_KEYTAB); + schemaProperty.put(PRINCIPAL_KEY, HADOOP_SCHEMA_PRINCIPAL + "@" + REALM); + + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-chown", "-R", "cli_schema", "/user/hadoop/" + catalogName); + Assertions.assertDoesNotThrow( + () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", schemaProperty)); + + catalog + .asFilesetCatalog() + .createFileset( + NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, TABLE_NAME), + "comment", + Fileset.Type.MANAGED, + null, + ImmutableMap.of()); + + Map tableProperty = Maps.newHashMap(); + tableProperty.put(AUTH_TYPE_KEY, "kerberos"); + // Disable impersonation here, so the user is the same as the principal as 'cli_schema' + tableProperty.put(IMPERSONATION_ENABLE_KEY, "false"); + tableProperty.put(KEY_TAB_URI_KEY, TMP_DIR + HADOOP_FILESET_KEYTAB); + tableProperty.put(PRINCIPAL_KEY, HADOOP_FILESET_PRINCIPAL + "@" + REALM); + + String fileset1 = GravitinoITUtils.genRandomName("fileset1"); + Exception exception = + Assertions.assertThrows( + Exception.class, + () -> + catalog + .asFilesetCatalog() + .createFileset( + NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset1), + "comment", + Fileset.Type.MANAGED, + null, + tableProperty)); + String exceptionMessage = Throwables.getStackTraceAsString(exception); + Assertions.assertTrue( + exceptionMessage.contains("Permission denied: user=cli_fileset, access=WRITE")); + + // Now change the owner of schema directory to 'cli_fileset' + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-chown", "-R", "cli_fileset", "/user/hadoop/" + catalogName); + Assertions.assertDoesNotThrow( + () -> + catalog + .asFilesetCatalog() + .createFileset( + NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset1), + "comment", + Fileset.Type.MANAGED, + null, + tableProperty)); + + // enable user impersonation, so the real user is gravitino_client + tableProperty.put(IMPERSONATION_ENABLE_KEY, "true"); + String fileset2 = GravitinoITUtils.genRandomName("fileset2"); + exception = + Assertions.assertThrows( + Exception.class, + () -> + catalog + .asFilesetCatalog() + .createFileset( + NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset2), + "comment", + Fileset.Type.MANAGED, + null, + tableProperty)); + exceptionMessage = Throwables.getStackTraceAsString(exception); + Assertions.assertTrue( + exceptionMessage.contains("Permission denied: user=gravitino_client, access=WRITE")); + + // Now change the owner of schema directory to 'gravitino_client' + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-chown", "-R", "gravitino_client", "/user/hadoop/" + catalogName); + Assertions.assertDoesNotThrow( + () -> + catalog + .asFilesetCatalog() + .createFileset( + NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset2), + "comment", + Fileset.Type.MANAGED, + null, + tableProperty)); + + // As the owner of the current schema directory is 'gravitino_client', the current user is + // cli_fileset. + Assertions.assertThrows( + Exception.class, + () -> + catalog + .asFilesetCatalog() + .dropFileset(NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset1))); + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-chown", "-R", "cli_fileset", "/user/hadoop/" + catalogName); + // Now try to drop fileset + Assertions.assertDoesNotThrow( + () -> + catalog + .asFilesetCatalog() + .dropFileset(NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset1))); + + Assertions.assertThrows( + Exception.class, + () -> + catalog + .asFilesetCatalog() + .dropFileset(NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset2))); + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-chown", "-R", "gravitino_client", "/user/hadoop/" + catalogName); + Assertions.assertDoesNotThrow( + () -> + catalog + .asFilesetCatalog() + .dropFileset(NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset2))); + + Assertions.assertThrows( + Exception.class, () -> catalog.asSchemas().dropSchema(SCHEMA_NAME, true)); + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-chown", "-R", "cli_schema", "/user/hadoop/" + catalogName); + Assertions.assertDoesNotThrow(() -> catalog.asSchemas().dropSchema(SCHEMA_NAME, true)); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/utils/PrincipalUtils.java b/core/src/main/java/com/datastrato/gravitino/utils/PrincipalUtils.java index d8a3255f1e1..d09c9398b34 100644 --- a/core/src/main/java/com/datastrato/gravitino/utils/PrincipalUtils.java +++ b/core/src/main/java/com/datastrato/gravitino/utils/PrincipalUtils.java @@ -33,10 +33,19 @@ public static T doAs(Principal principal, PrivilegedExceptionAction actio public static Principal getCurrentPrincipal() { java.security.AccessControlContext context = java.security.AccessController.getContext(); Subject subject = Subject.getSubject(context); - if (subject == null || subject.getPrincipals(UserPrincipal.class).isEmpty()) { + if (subject == null) { return new UserPrincipal(AuthConstants.ANONYMOUS_USER); } - return subject.getPrincipals(UserPrincipal.class).iterator().next(); + + if (!subject.getPrincipals(UserPrincipal.class).isEmpty()) { + return subject.getPrincipals(UserPrincipal.class).iterator().next(); + } + + if (!subject.getPrincipals().isEmpty()) { + return new UserPrincipal(subject.getPrincipals().iterator().next().getName()); + } + + return new UserPrincipal(AuthConstants.ANONYMOUS_USER); } public static String getCurrentUserName() { From a9773682cd4ffdc7ef8c2d0b9e57d92dfb92474e Mon Sep 17 00:00:00 2001 From: yuqi Date: Thu, 13 Jun 2024 17:48:20 +0800 Subject: [PATCH 02/15] fix --- catalogs/catalog-hadoop/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalogs/catalog-hadoop/build.gradle.kts b/catalogs/catalog-hadoop/build.gradle.kts index 2ecb133244b..8b881bde943 100644 --- a/catalogs/catalog-hadoop/build.gradle.kts +++ b/catalogs/catalog-hadoop/build.gradle.kts @@ -103,7 +103,7 @@ tasks.test { doFirst { environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12") - environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-kerberos-hive:test") + environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-kerberos-hive:0.1.3") } val init = project.extra.get("initIntegrationTest") as (Test) -> Unit From 0737a63cddda97021412cfbe97e97b426455e98a Mon Sep 17 00:00:00 2001 From: yuqi Date: Fri, 14 Jun 2024 09:17:30 +0800 Subject: [PATCH 03/15] fix --- .../hadoop/HadoopCatalogOperations.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index 9ed78dec6aa..ff0f042c696 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -212,7 +212,8 @@ public Fileset createFileset( throws NoSuchSchemaException, FilesetAlreadyExistsException { // Reset the current user based on the name identifier. - UserGroupInformation currentUser = getCurrentUser(properties, ident, true); + UserGroupInformation currentUser = getCurrentUser(properties, ident); + String apiLoginUser = PrincipalUtils.getCurrentPrincipal().getName(); try { return currentUser.doAs( (PrivilegedExceptionAction) @@ -300,7 +301,7 @@ public Fileset createFileset( .withProperties(properties) .withAuditInfo( AuditInfo.builder() - .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreator(apiLoginUser) .withCreateTime(Instant.now()) .build()) .build(); @@ -375,7 +376,7 @@ public boolean dropFileset(NameIdentifier ident) { Path filesetPath = new Path(filesetEntity.storageLocation()); // Reset the current user based on the name identifier. - UserGroupInformation currentUser = getCurrentUser(filesetEntity.properties(), ident, true); + UserGroupInformation currentUser = getCurrentUser(filesetEntity.properties(), ident); return currentUser.doAs( (PrivilegedExceptionAction) @@ -459,8 +460,9 @@ public String initKerberos( public Schema createSchema(NameIdentifier ident, String comment, Map properties) throws NoSuchCatalogException, SchemaAlreadyExistsException { + String apiLoginUser = PrincipalUtils.getCurrentPrincipal().getName(); // Reset the current user based on the name identifier and properties. - UserGroupInformation currentUser = getCurrentUser(properties, ident, true); + UserGroupInformation currentUser = getCurrentUser(properties, ident); try { return currentUser.doAs( (PrivilegedExceptionAction) @@ -507,7 +509,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map) @@ -743,10 +745,9 @@ void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) { this.proxyPlugin = hadoopProxyPlugin; } - public UserGroupInformation getCurrentUser( - Map properties, NameIdentifier ident, boolean needRelogin) { + public UserGroupInformation getCurrentUser(Map properties, NameIdentifier ident) { KerberosConfig kerberosConfig = new KerberosConfig(properties); - if (kerberosConfig.isKerberosAuth() && needRelogin) { + if (kerberosConfig.isKerberosAuth()) { // We assume that the realm of catalog is the same as the realm of the schema and table. String keytabPath = String.format( From e5e44f5f6eba6cf3d2b42643283f378e8d30cb06 Mon Sep 17 00:00:00 2001 From: yuqi Date: Wed, 26 Jun 2024 15:16:11 +0800 Subject: [PATCH 04/15] Fix the possible thread problem about Kerberos users --- .../hadoop/HadoopCatalogOperations.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index ff0f042c696..f0d4af31fed 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -212,7 +212,7 @@ public Fileset createFileset( throws NoSuchSchemaException, FilesetAlreadyExistsException { // Reset the current user based on the name identifier. - UserGroupInformation currentUser = getCurrentUser(properties, ident); + UserGroupInformation currentUser = getUGIByIdent(properties, ident); String apiLoginUser = PrincipalUtils.getCurrentPrincipal().getName(); try { return currentUser.doAs( @@ -376,7 +376,7 @@ public boolean dropFileset(NameIdentifier ident) { Path filesetPath = new Path(filesetEntity.storageLocation()); // Reset the current user based on the name identifier. - UserGroupInformation currentUser = getCurrentUser(filesetEntity.properties(), ident); + UserGroupInformation currentUser = getUGIByIdent(filesetEntity.properties(), ident); return currentUser.doAs( (PrivilegedExceptionAction) @@ -420,7 +420,13 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc } } - public String initKerberos( + /** + * Get the UserGroupInformation based on the NameIdentifier and properties. + * + *

Note: As UserGroupInformation is a static class, to avoid the thread safety issue, we need + * to use synchronized to ensure the thread safety: Make login and getLoginUser atomic. + */ + private synchronized String initKerberos( String keytabPath, Map properties, Configuration configuration, @@ -435,7 +441,7 @@ public String initKerberos( try { KerberosClient kerberosClient = new KerberosClient(properties, configuration); - // Add the kerberos client to closable to close resources. + // Add the kerberos client to the closable to close resources. closeables.add(kerberosClient); File keytabFile = kerberosClient.saveKeyTabFileFromUri(keytabPath); @@ -462,7 +468,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map) @@ -596,8 +602,8 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty Map properties = Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap()); - // Reset current user based on the name identifier. - UserGroupInformation user = getCurrentUser(schemaEntity.properties(), ident); + // Reset the current user based on the name identifier. + UserGroupInformation user = getUGIByIdent(schemaEntity.properties(), ident); return user.doAs( (PrivilegedExceptionAction) @@ -745,7 +751,7 @@ void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) { this.proxyPlugin = hadoopProxyPlugin; } - public UserGroupInformation getCurrentUser(Map properties, NameIdentifier ident) { + private UserGroupInformation getUGIByIdent(Map properties, NameIdentifier ident) { KerberosConfig kerberosConfig = new KerberosConfig(properties); if (kerberosConfig.isKerberosAuth()) { // We assume that the realm of catalog is the same as the realm of the schema and table. @@ -754,18 +760,14 @@ public UserGroupInformation getCurrentUser(Map properties, NameI GRAVITINO_KEYTAB_FORMAT, catalogInfo.id() + "-" + ident.toString().replace(".", "-")); initKerberos(keytabPath, properties, new Configuration(), ident); } - + // If the kerberos is not enabled (Simple mode), we will use the current user return getUserBaseOnNameIdentifier(ident); } private UserGroupInformation getUserBaseOnNameIdentifier(NameIdentifier nameIdentifier) { UserInfo userInfo = getNearestUserGroupInformation(nameIdentifier); if (userInfo == null) { - try { - return UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new IllegalStateException("Fail to get currentUser", e); - } + return UserGroupInformation.createRemoteUser(PrincipalUtils.getCurrentUserName()); } UserGroupInformation ugi = userInfo.loginUser; From 89a643e373d4695b5418590941717a26ed53181a Mon Sep 17 00:00:00 2001 From: yuqi Date: Wed, 26 Jun 2024 15:42:23 +0800 Subject: [PATCH 05/15] fix test --- .../gravitino/catalog/hadoop/HadoopCatalogOperations.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index f0d4af31fed..b0e2fcc4912 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -767,7 +767,11 @@ private UserGroupInformation getUGIByIdent(Map properties, NameI private UserGroupInformation getUserBaseOnNameIdentifier(NameIdentifier nameIdentifier) { UserInfo userInfo = getNearestUserGroupInformation(nameIdentifier); if (userInfo == null) { - return UserGroupInformation.createRemoteUser(PrincipalUtils.getCurrentUserName()); + try { + return UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException("Failed to get login user", e); + } } UserGroupInformation ugi = userInfo.loginUser; From 32fc1d18d030d56cdad15a542c34f427bd4e3163 Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 1 Jul 2024 20:22:48 +0800 Subject: [PATCH 06/15] fix --- .../catalog/hadoop/HadoopCatalog.java | 2 +- .../hadoop/HadoopCatalogOperations.java | 460 +++++++++--------- .../catalog/hadoop/HadoopProxyPlugin.java | 4 +- .../hadoop/SecureHadoopCatalogOperations.java | 271 +++++++++++ .../hadoop/TestHadoopCatalogOperations.java | 89 ++-- .../test/container/ContainerSuite.java | 1 + 6 files changed, 551 insertions(+), 276 deletions(-) create mode 100644 catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java index f6ae1142032..19acb5df5a9 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java @@ -36,7 +36,7 @@ public String shortName() { @Override protected CatalogOperations newOps(Map config) { - HadoopCatalogOperations ops = new HadoopCatalogOperations(); + CatalogOperations ops = new SecureHadoopCatalogOperations(); return ops; } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index b0e2fcc4912..401414de00a 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -45,7 +45,6 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.security.PrivilegedExceptionAction; import java.time.Instant; import java.util.Collections; import java.util.List; @@ -92,6 +91,8 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-%s"; + private final ThreadLocal currentUser = ThreadLocal.withInitial(() -> null); + HadoopCatalogOperations(EntityStore store) { this.store = store; } @@ -104,6 +105,18 @@ public String getKerberosRealm() { return kerberosRealm; } + public EntityStore getStore() { + return store; + } + + public void setCurrentUser(String userName) { + currentUser.set(userName); + } + + public Map getUserInfoMap() { + return userInfoMap; + } + static class UserInfo { UserGroupInformation loginUser; boolean enableUserImpersonation; @@ -159,6 +172,13 @@ private void initAuthentication(Map conf, Configuration hadoopCo conf, hadoopConf, NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name())); + } else if (config.isSimpleAuth()) { + // If the catalog is simple authentication, set api login user as the current user + UserGroupInformation u = + UserGroupInformation.createRemoteUser(PrincipalUtils.getCurrentUserName()); + userInfoMap.put( + NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name()), + UserInfo.of(u, false, null)); } } @@ -211,121 +231,105 @@ public Fileset createFileset( Map properties) throws NoSuchSchemaException, FilesetAlreadyExistsException { + String apiUser = + currentUser.get() == null ? PrincipalUtils.getCurrentUserName() : currentUser.get(); + // Reset the current user based on the name identifier. - UserGroupInformation currentUser = getUGIByIdent(properties, ident); - String apiLoginUser = PrincipalUtils.getCurrentPrincipal().getName(); try { - return currentUser.doAs( - (PrivilegedExceptionAction) - () -> { - try { - if (store.exists(ident, Entity.EntityType.FILESET)) { - throw new FilesetAlreadyExistsException("Fileset %s already exists", ident); - } - } catch (IOException ioe) { - throw new RuntimeException( - "Failed to check if fileset " + ident + " exists", ioe); - } - - SchemaEntity schemaEntity; - NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); - try { - schemaEntity = - store.get(schemaIdent, Entity.EntityType.SCHEMA, SchemaEntity.class); - } catch (NoSuchEntityException exception) { - throw new NoSuchSchemaException( - exception, SCHEMA_DOES_NOT_EXIST_MSG, schemaIdent); - } catch (IOException ioe) { - throw new RuntimeException("Failed to load schema " + schemaIdent, ioe); - } - - // For external fileset, the storageLocation must be set. - if (type == Fileset.Type.EXTERNAL && StringUtils.isBlank(storageLocation)) { - throw new IllegalArgumentException( - "Storage location must be set for external fileset " + ident); - } - - // Either catalog property "location", or schema property "location", or - // storageLocation must be - // set for managed fileset. - Path schemaPath = getSchemaPath(schemaIdent.name(), schemaEntity.properties()); - if (schemaPath == null && StringUtils.isBlank(storageLocation)) { - throw new IllegalArgumentException( - "Storage location must be set for fileset " - + ident - + " when it's catalog and schema location are not set"); - } - - // The specified storageLocation will take precedence over the calculated one. - Path filesetPath = - StringUtils.isNotBlank(storageLocation) - ? new Path(storageLocation) - : new Path(schemaPath, ident.name()); - - try { - // formalize the path to avoid path without scheme, uri, authority, etc. - filesetPath = formalizePath(filesetPath, hadoopConf); - FileSystem fs = filesetPath.getFileSystem(hadoopConf); - if (!fs.exists(filesetPath)) { - if (!fs.mkdirs(filesetPath)) { - throw new RuntimeException( - "Failed to create fileset " + ident + " location " + filesetPath); - } - - LOG.info("Created fileset {} location {}", ident, filesetPath); - } else { - LOG.info("Fileset {} manages the existing location {}", ident, filesetPath); - } - - } catch (IOException ioe) { - throw new RuntimeException( - "Failed to create fileset " + ident + " location " + filesetPath, ioe); - } - - StringIdentifier stringId = StringIdentifier.fromProperties(properties); - Preconditions.checkArgument( - stringId != null, "Property String identifier should not be null"); - - FilesetEntity filesetEntity = - FilesetEntity.builder() - .withName(ident.name()) - .withId(stringId.id()) - .withNamespace(ident.namespace()) - .withComment(comment) - .withFilesetType(type) - // Store the storageLocation to the store. If the "storageLocation" is null - // for - // managed fileset, Gravitino will get and store the location based on the - // catalog/schema's location and store it to the store. - .withStorageLocation(filesetPath.toString()) - .withProperties(properties) - .withAuditInfo( - AuditInfo.builder() - .withCreator(apiLoginUser) - .withCreateTime(Instant.now()) - .build()) - .build(); - - try { - store.put(filesetEntity, true /* overwrite */); - } catch (IOException ioe) { - throw new RuntimeException("Failed to create fileset " + ident, ioe); - } - - return HadoopFileset.builder() - .withName(ident.name()) - .withComment(comment) - .withType(type) - .withStorageLocation(filesetPath.toString()) - .withProperties(filesetEntity.properties()) - .withAuditInfo(filesetEntity.auditInfo()) - .build(); - }); - } catch (InterruptedException e) { - throw new RuntimeException("Failed to create fileset " + ident, e); - } catch (IOException e) { - throw new RuntimeException(e); + if (store.exists(ident, Entity.EntityType.FILESET)) { + throw new FilesetAlreadyExistsException("Fileset %s already exists", ident); + } + } catch (IOException ioe) { + throw new RuntimeException("Failed to check if fileset " + ident + " exists", ioe); + } + + SchemaEntity schemaEntity; + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + try { + schemaEntity = store.get(schemaIdent, Entity.EntityType.SCHEMA, SchemaEntity.class); + } catch (NoSuchEntityException exception) { + throw new NoSuchSchemaException(exception, SCHEMA_DOES_NOT_EXIST_MSG, schemaIdent); + } catch (IOException ioe) { + throw new RuntimeException("Failed to load schema " + schemaIdent, ioe); + } + + // For external fileset, the storageLocation must be set. + if (type == Fileset.Type.EXTERNAL && StringUtils.isBlank(storageLocation)) { + throw new IllegalArgumentException( + "Storage location must be set for external fileset " + ident); + } + + // Either catalog property "location", or schema property "location", or + // storageLocation must be + // set for managed fileset. + Path schemaPath = getSchemaPath(schemaIdent.name(), schemaEntity.properties()); + if (schemaPath == null && StringUtils.isBlank(storageLocation)) { + throw new IllegalArgumentException( + "Storage location must be set for fileset " + + ident + + " when it's catalog and schema location are not set"); + } + + // The specified storageLocation will take precedence over the calculated one. + Path filesetPath = + StringUtils.isNotBlank(storageLocation) + ? new Path(storageLocation) + : new Path(schemaPath, ident.name()); + + try { + // formalize the path to avoid path without scheme, uri, authority, etc. + filesetPath = formalizePath(filesetPath, hadoopConf); + FileSystem fs = filesetPath.getFileSystem(hadoopConf); + if (!fs.exists(filesetPath)) { + if (!fs.mkdirs(filesetPath)) { + throw new RuntimeException( + "Failed to create fileset " + ident + " location " + filesetPath); + } + + LOG.info("Created fileset {} location {}", ident, filesetPath); + } else { + LOG.info("Fileset {} manages the existing location {}", ident, filesetPath); + } + + } catch (IOException ioe) { + throw new RuntimeException( + "Failed to create fileset " + ident + " location " + filesetPath, ioe); + } + + StringIdentifier stringId = StringIdentifier.fromProperties(properties); + Preconditions.checkArgument(stringId != null, "Property String identifier should not be null"); + + FilesetEntity filesetEntity = + FilesetEntity.builder() + .withName(ident.name()) + .withId(stringId.id()) + .withNamespace(ident.namespace()) + .withComment(comment) + .withFilesetType(type) + // Store the storageLocation to the store. If the "storageLocation" is null + // for + // managed fileset, Gravitino will get and store the location based on the + // catalog/schema's location and store it to the store. + .withStorageLocation(filesetPath.toString()) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder().withCreator(apiUser).withCreateTime(Instant.now()).build()) + .build(); + + try { + store.put(filesetEntity, true /* overwrite */); + } catch (IOException ioe) { + throw new RuntimeException("Failed to create fileset " + ident, ioe); } + + return HadoopFileset.builder() + .withName(ident.name()) + .withComment(comment) + .withType(type) + .withStorageLocation(filesetPath.toString()) + .withProperties(filesetEntity.properties()) + .withAuditInfo(filesetEntity.auditInfo()) + .build(); } @Override @@ -375,34 +379,27 @@ public boolean dropFileset(NameIdentifier ident) { store.get(ident, Entity.EntityType.FILESET, FilesetEntity.class); Path filesetPath = new Path(filesetEntity.storageLocation()); - // Reset the current user based on the name identifier. - UserGroupInformation currentUser = getUGIByIdent(filesetEntity.properties(), ident); - - return currentUser.doAs( - (PrivilegedExceptionAction) - () -> { - // For managed fileset, we should delete the related files. - if (filesetEntity.filesetType() == Fileset.Type.MANAGED) { - FileSystem fs = filesetPath.getFileSystem(hadoopConf); - if (fs.exists(filesetPath)) { - if (!fs.delete(filesetPath, true)) { - LOG.warn("Failed to delete fileset {} location {}", ident, filesetPath); - return false; - } - - } else { - LOG.warn("Fileset {} location {} does not exist", ident, filesetPath); - } - } - - boolean success = store.delete(ident, Entity.EntityType.FILESET); - cleanUserInfo(ident); - return success; - }); + // For managed fileset, we should delete the related files. + if (filesetEntity.filesetType() == Fileset.Type.MANAGED) { + FileSystem fs = filesetPath.getFileSystem(hadoopConf); + if (fs.exists(filesetPath)) { + if (!fs.delete(filesetPath, true)) { + LOG.warn("Failed to delete fileset {} location {}", ident, filesetPath); + return false; + } + + } else { + LOG.warn("Fileset {} location {} does not exist", ident, filesetPath); + } + } + + boolean success = store.delete(ident, Entity.EntityType.FILESET); + cleanUserInfo(ident); + return success; } catch (NoSuchEntityException ne) { LOG.warn("Fileset {} does not exist", ident); return false; - } catch (IOException | InterruptedException e) { + } catch (IOException e) { throw new RuntimeException("Failed to delete fileset " + ident, e); } } @@ -426,7 +423,7 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc *

Note: As UserGroupInformation is a static class, to avoid the thread safety issue, we need * to use synchronized to ensure the thread safety: Make login and getLoginUser atomic. */ - private synchronized String initKerberos( + public synchronized String initKerberos( String keytabPath, Map properties, Configuration configuration, @@ -466,75 +463,63 @@ private synchronized String initKerberos( public Schema createSchema(NameIdentifier ident, String comment, Map properties) throws NoSuchCatalogException, SchemaAlreadyExistsException { - String apiLoginUser = PrincipalUtils.getCurrentPrincipal().getName(); - // Reset the current user based on the name identifier and properties. - UserGroupInformation currentUser = getUGIByIdent(properties, ident); + String apiUser = + currentUser.get() == null ? PrincipalUtils.getCurrentUserName() : currentUser.get(); + try { - return currentUser.doAs( - (PrivilegedExceptionAction) - () -> { - try { - if (store.exists(ident, Entity.EntityType.SCHEMA)) { - throw new SchemaAlreadyExistsException("Schema %s already exists", ident); - } - } catch (IOException ioe) { - throw new RuntimeException("Failed to check if schema " + ident + " exists", ioe); - } - - Path schemaPath = getSchemaPath(ident.name(), properties); - if (schemaPath != null) { - try { - FileSystem fs = schemaPath.getFileSystem(hadoopConf); - if (!fs.exists(schemaPath)) { - if (!fs.mkdirs(schemaPath)) { - // Fail the operation when failed to create the schema path. - throw new RuntimeException( - "Failed to create schema " + ident + " location " + schemaPath); - } - LOG.info("Created schema {} location {}", ident, schemaPath); - } else { - LOG.info("Schema {} manages the existing location {}", ident, schemaPath); - } - - } catch (IOException ioe) { - throw new RuntimeException( - "Failed to create schema " + ident + " location " + schemaPath, ioe); - } - } - - StringIdentifier stringId = StringIdentifier.fromProperties(properties); - Preconditions.checkNotNull( - stringId, "Property String identifier should not be null"); - - SchemaEntity schemaEntity = - SchemaEntity.builder() - .withName(ident.name()) - .withId(stringId.id()) - .withNamespace(ident.namespace()) - .withComment(comment) - .withProperties(properties) - .withAuditInfo( - AuditInfo.builder() - .withCreator(apiLoginUser) - .withCreateTime(Instant.now()) - .build()) - .build(); - try { - store.put(schemaEntity, true /* overwrite */); - } catch (IOException ioe) { - throw new RuntimeException("Failed to create schema " + ident, ioe); - } - - return HadoopSchema.builder() - .withName(ident.name()) - .withComment(comment) - .withProperties(schemaEntity.properties()) - .withAuditInfo(schemaEntity.auditInfo()) - .build(); - }); - } catch (IOException | InterruptedException e) { - throw new RuntimeException("Failed to create schema " + ident, e); + if (store.exists(ident, Entity.EntityType.SCHEMA)) { + throw new SchemaAlreadyExistsException("Schema %s already exists", ident); + } + } catch (IOException ioe) { + throw new RuntimeException("Failed to check if schema " + ident + " exists", ioe); + } + + Path schemaPath = getSchemaPath(ident.name(), properties); + if (schemaPath != null) { + try { + FileSystem fs = schemaPath.getFileSystem(hadoopConf); + if (!fs.exists(schemaPath)) { + if (!fs.mkdirs(schemaPath)) { + // Fail the operation when failed to create the schema path. + throw new RuntimeException( + "Failed to create schema " + ident + " location " + schemaPath); + } + LOG.info("Created schema {} location {}", ident, schemaPath); + } else { + LOG.info("Schema {} manages the existing location {}", ident, schemaPath); + } + + } catch (IOException ioe) { + throw new RuntimeException( + "Failed to create schema " + ident + " location " + schemaPath, ioe); + } } + + StringIdentifier stringId = StringIdentifier.fromProperties(properties); + Preconditions.checkNotNull(stringId, "Property String identifier should not be null"); + + SchemaEntity schemaEntity = + SchemaEntity.builder() + .withName(ident.name()) + .withId(stringId.id()) + .withNamespace(ident.namespace()) + .withComment(comment) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder().withCreator(apiUser).withCreateTime(Instant.now()).build()) + .build(); + try { + store.put(schemaEntity, true /* overwrite */); + } catch (IOException ioe) { + throw new RuntimeException("Failed to create schema " + ident, ioe); + } + + return HadoopSchema.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(schemaEntity.properties()) + .withAuditInfo(schemaEntity.auditInfo()) + .build(); } @Override @@ -602,37 +587,30 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty Map properties = Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap()); - // Reset the current user based on the name identifier. - UserGroupInformation user = getUGIByIdent(schemaEntity.properties(), ident); - - return user.doAs( - (PrivilegedExceptionAction) - () -> { - Path schemaPath = getSchemaPath(ident.name(), properties); - // Nothing to delete if the schema path is not set. - if (schemaPath == null) { - return false; - } - - FileSystem fs = schemaPath.getFileSystem(hadoopConf); - // Nothing to delete if the schema path does not exist. - if (!fs.exists(schemaPath)) { - return false; - } - - if (fs.listStatus(schemaPath).length > 0 && !cascade) { - throw new NonEmptySchemaException( - "Schema %s with location %s is not empty", ident, schemaPath); - } else { - fs.delete(schemaPath, true); - } - - cleanUserInfo(ident); - LOG.info("Deleted schema {} location {}", ident, schemaPath); - return true; - }); - } catch (IOException | InterruptedException e) { - throw new RuntimeException("Failed to delete schema " + ident + " location", e); + Path schemaPath = getSchemaPath(ident.name(), properties); + // Nothing to delete if the schema path is not set. + if (schemaPath == null) { + return false; + } + + FileSystem fs = schemaPath.getFileSystem(hadoopConf); + // Nothing to delete if the schema path does not exist. + if (!fs.exists(schemaPath)) { + return false; + } + + if (fs.listStatus(schemaPath).length > 0 && !cascade) { + throw new NonEmptySchemaException( + "Schema %s with location %s is not empty", ident, schemaPath); + } else { + fs.delete(schemaPath, true); + } + + cleanUserInfo(ident); + LOG.info("Deleted schema {} location {}", ident, schemaPath); + return true; + } catch (IOException ioe) { + throw new RuntimeException("Failed to delete schema " + ident + " location", ioe); } } @@ -751,7 +729,7 @@ void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) { this.proxyPlugin = hadoopProxyPlugin; } - private UserGroupInformation getUGIByIdent(Map properties, NameIdentifier ident) { + UserGroupInformation getUGIByIdent(Map properties, NameIdentifier ident) { KerberosConfig kerberosConfig = new KerberosConfig(properties); if (kerberosConfig.isKerberosAuth()) { // We assume that the realm of catalog is the same as the realm of the schema and table. @@ -767,11 +745,7 @@ private UserGroupInformation getUGIByIdent(Map properties, NameI private UserGroupInformation getUserBaseOnNameIdentifier(NameIdentifier nameIdentifier) { UserInfo userInfo = getNearestUserGroupInformation(nameIdentifier); if (userInfo == null) { - try { - return UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException("Failed to get login user", e); - } + throw new RuntimeException("Failed to get get current user: " + nameIdentifier); } UserGroupInformation ugi = userInfo.loginUser; diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java index 72f8a580520..b6141acab10 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java @@ -18,7 +18,7 @@ import org.apache.hadoop.security.UserGroupInformation; public class HadoopProxyPlugin implements ProxyPlugin { - private HadoopCatalogOperations ops; + private SecureHadoopCatalogOperations ops; private final UserGroupInformation realUser; public HadoopProxyPlugin() { @@ -68,7 +68,7 @@ public Object doAs( @Override public void bindCatalogOperation(CatalogOperations ops) { - this.ops = ((HadoopCatalogOperations) ops); + this.ops = ((SecureHadoopCatalogOperations) ops); this.ops.setProxyPlugin(this); } } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java new file mode 100644 index 00000000000..f20a374c8d9 --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java @@ -0,0 +1,271 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.hadoop; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityStore; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.Schema; +import com.datastrato.gravitino.SchemaChange; +import com.datastrato.gravitino.catalog.hadoop.HadoopCatalogOperations.UserInfo; +import com.datastrato.gravitino.connector.CatalogInfo; +import com.datastrato.gravitino.connector.CatalogOperations; +import com.datastrato.gravitino.connector.HasPropertyMetadata; +import com.datastrato.gravitino.connector.SupportsSchemas; +import com.datastrato.gravitino.exceptions.FilesetAlreadyExistsException; +import com.datastrato.gravitino.exceptions.NoSuchCatalogException; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.exceptions.NoSuchFilesetException; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.NonEmptySchemaException; +import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.file.Fileset; +import com.datastrato.gravitino.file.FilesetCatalog; +import com.datastrato.gravitino.file.FilesetChange; +import com.datastrato.gravitino.meta.FilesetEntity; +import com.datastrato.gravitino.meta.SchemaEntity; +import com.datastrato.gravitino.utils.PrincipalUtils; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SecureHadoopCatalogOperations is a secure version of HadoopCatalogOperations that can manage + * Schema and fileset level of user authentication. + */ +public class SecureHadoopCatalogOperations + implements CatalogOperations, SupportsSchemas, FilesetCatalog { + + public static final Logger LOG = LoggerFactory.getLogger(SecureHadoopCatalogOperations.class); + + private final HadoopCatalogOperations hadoopCatalogOperations; + + public SecureHadoopCatalogOperations() { + this.hadoopCatalogOperations = new HadoopCatalogOperations(); + } + + public SecureHadoopCatalogOperations(EntityStore store) { + this.hadoopCatalogOperations = new HadoopCatalogOperations(store); + } + + public HadoopCatalogOperations getHadoopCatalogOperations() { + return hadoopCatalogOperations; + } + + public String getKerberosRealm() { + return hadoopCatalogOperations.getKerberosRealm(); + } + + public void setProxyPlugin(HadoopProxyPlugin plugin) { + hadoopCatalogOperations.setProxyPlugin(plugin); + } + + // We have overridden the createFileset, dropFileset, createSchema, dropSchema method to reset + // the current user based on the name identifier. + + @Override + public Fileset createFileset( + NameIdentifier ident, + String comment, + Fileset.Type type, + String storageLocation, + Map properties) + throws NoSuchSchemaException, FilesetAlreadyExistsException { + + // Why I need to do this? When we call getUGIByIdent, `PrincipalUtils.getCurrentUserName()` is + // Not the api user, but the Kerberos principal name. + String apiUser = PrincipalUtils.getCurrentUserName(); + hadoopCatalogOperations.setCurrentUser(apiUser); + + UserGroupInformation currentUser = hadoopCatalogOperations.getUGIByIdent(properties, ident); + try { + return currentUser.doAs( + (PrivilegedExceptionAction) + () -> + hadoopCatalogOperations.createFileset( + ident, comment, type, storageLocation, properties)); + } catch (IOException | InterruptedException ioe) { + throw new RuntimeException("Failed to create fileset " + ident, ioe); + } catch (UndeclaredThrowableException e) { + Throwable innerException = e.getCause(); + if (innerException instanceof PrivilegedActionException) { + throw new RuntimeException(innerException.getCause()); + } else if (innerException instanceof InvocationTargetException) { + throw new RuntimeException(innerException.getCause()); + } else { + throw new RuntimeException(innerException); + } + } finally { + hadoopCatalogOperations.setCurrentUser(null); + } + } + + @Override + public boolean dropFileset(NameIdentifier ident) { + + FilesetEntity filesetEntity; + try { + filesetEntity = + hadoopCatalogOperations + .getStore() + .get(ident, Entity.EntityType.FILESET, FilesetEntity.class); + } catch (NoSuchEntityException e) { + LOG.warn("Fileset {} does not exist", ident); + return false; + } catch (IOException ioe) { + throw new RuntimeException("Failed to delete fileset " + ident, ioe); + } + + // Reset the current user based on the name identifier. + UserGroupInformation currentUser = + hadoopCatalogOperations.getUGIByIdent(filesetEntity.properties(), ident); + + try { + return currentUser.doAs( + (PrivilegedExceptionAction) () -> hadoopCatalogOperations.dropFileset(ident)); + } catch (IOException | InterruptedException ioe) { + throw new RuntimeException("Failed to create fileset " + ident, ioe); + } catch (UndeclaredThrowableException e) { + Throwable innerException = e.getCause(); + if (innerException instanceof PrivilegedActionException) { + throw new RuntimeException(innerException.getCause()); + } else if (innerException instanceof InvocationTargetException) { + throw new RuntimeException(innerException.getCause()); + } else { + throw new RuntimeException(innerException); + } + } + } + + @Override + public Schema createSchema(NameIdentifier ident, String comment, Map properties) + throws NoSuchCatalogException, SchemaAlreadyExistsException { + + try { + String apiUser = PrincipalUtils.getCurrentUserName(); + hadoopCatalogOperations.setCurrentUser(apiUser); + // Reset the current user based on the name identifier and properties. + UserGroupInformation currentUser = hadoopCatalogOperations.getUGIByIdent(properties, ident); + + return currentUser.doAs( + (PrivilegedExceptionAction) + () -> hadoopCatalogOperations.createSchema(ident, comment, properties)); + } catch (IOException | InterruptedException ioe) { + throw new RuntimeException("Failed to create fileset " + ident, ioe); + } catch (UndeclaredThrowableException e) { + Throwable innerException = e.getCause(); + if (innerException instanceof PrivilegedActionException) { + throw new RuntimeException(innerException.getCause()); + } else if (innerException instanceof InvocationTargetException) { + throw new RuntimeException(innerException.getCause()); + } else { + throw new RuntimeException(innerException); + } + } finally { + hadoopCatalogOperations.setCurrentUser(null); + } + } + + @Override + public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { + try { + SchemaEntity schemaEntity = + hadoopCatalogOperations + .getStore() + .get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class); + Map properties = + Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap()); + + // Reset the current user based on the name identifier. + UserGroupInformation user = hadoopCatalogOperations.getUGIByIdent(properties, ident); + + return user.doAs( + (PrivilegedExceptionAction) + () -> hadoopCatalogOperations.dropSchema(ident, cascade)); + } catch (IOException | InterruptedException ioe) { + throw new RuntimeException("Failed to create fileset " + ident, ioe); + } catch (UndeclaredThrowableException e) { + Throwable innerException = e.getCause(); + if (innerException instanceof PrivilegedActionException) { + throw new RuntimeException(innerException.getCause()); + } else if (innerException instanceof InvocationTargetException) { + throw new RuntimeException(innerException.getCause()); + } else { + throw new RuntimeException(innerException); + } + } + } + + @Override + public void initialize( + Map config, CatalogInfo info, HasPropertyMetadata propertiesMetadata) + throws RuntimeException { + hadoopCatalogOperations.initialize(config, info, propertiesMetadata); + } + + @Override + public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes) + throws NoSuchFilesetException, IllegalArgumentException { + try { + return hadoopCatalogOperations.alterFileset(ident, changes); + } finally { + String finalName = ident.name(); + for (FilesetChange change : changes) { + if (change instanceof FilesetChange.RenameFileset) { + finalName = ((FilesetChange.RenameFileset) change).getNewName(); + } + } + if (!ident.name().equals(finalName)) { + UserInfo userInfo = hadoopCatalogOperations.getUserInfoMap().remove(ident); + if (userInfo != null) { + hadoopCatalogOperations + .getUserInfoMap() + .put(NameIdentifier.of(ident.namespace(), finalName), userInfo); + } + } + } + } + + @Override + public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { + return hadoopCatalogOperations.listSchemas(namespace); + } + + @Override + public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { + return hadoopCatalogOperations.loadSchema(ident); + } + + @Override + public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) + throws NoSuchSchemaException { + return hadoopCatalogOperations.alterSchema(ident, changes); + } + + @Override + public NameIdentifier[] listFilesets(Namespace namespace) throws NoSuchSchemaException { + return hadoopCatalogOperations.listFilesets(namespace); + } + + @Override + public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException { + return hadoopCatalogOperations.loadFileset(ident); + } + + @Override + public void close() throws IOException { + hadoopCatalogOperations.close(); + } +} diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java index f4e4a5cb59a..36847995657 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java @@ -25,6 +25,7 @@ import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.SchemaChange; import com.datastrato.gravitino.StringIdentifier; +import com.datastrato.gravitino.connector.CatalogInfo; import com.datastrato.gravitino.connector.HasPropertyMetadata; import com.datastrato.gravitino.connector.PropertiesMetadata; import com.datastrato.gravitino.exceptions.NoSuchFilesetException; @@ -101,6 +102,30 @@ public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationE private static IdGenerator idGenerator; + private static CatalogInfo randomCatalogInfo() { + return new CatalogInfo( + idGenerator.nextId(), + "catalog1", + CatalogInfo.Type.FILESET, + "provider1", + "comment1", + Maps.newHashMap(), + null, + Namespace.of("m1", "c1")); + } + + private static CatalogInfo randomCatalogInfo(String metalakeName, String catalogName) { + return new CatalogInfo( + idGenerator.nextId(), + catalogName, + CatalogInfo.Type.FILESET, + "hadoop", + "comment1", + Maps.newHashMap(), + null, + Namespace.of(metalakeName)); + } + @BeforeAll public static void setUp() { Config config = Mockito.mock(Config.class); @@ -131,14 +156,18 @@ public static void tearDown() throws IOException { @Test public void testHadoopCatalogConfiguration() { Map emptyProps = Maps.newHashMap(); - HadoopCatalogOperations ops = new HadoopCatalogOperations(store); - ops.initialize(emptyProps, null, HADOOP_PROPERTIES_METADATA); + SecureHadoopCatalogOperations secOps = new SecureHadoopCatalogOperations(store); + + HadoopCatalogOperations ops = secOps.getHadoopCatalogOperations(); + + CatalogInfo catalogInfo = randomCatalogInfo(); + ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA); Configuration conf = ops.hadoopConf; String value = conf.get("fs.defaultFS"); Assertions.assertEquals("file:///", value); emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS", "hdfs://localhost:9000"); - ops.initialize(emptyProps, null, HADOOP_PROPERTIES_METADATA); + ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA); Configuration conf1 = ops.hadoopConf; String value1 = conf1.get("fs.defaultFS"); Assertions.assertEquals("hdfs://localhost:9000", value1); @@ -146,7 +175,7 @@ public void testHadoopCatalogConfiguration() { Assertions.assertFalse(ops.catalogStorageLocation.isPresent()); emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, "file:///tmp/catalog"); - ops.initialize(emptyProps, null, HADOOP_PROPERTIES_METADATA); + ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA); Assertions.assertTrue(ops.catalogStorageLocation.isPresent()); Path expectedPath = new Path("file:///tmp/catalog"); Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get()); @@ -226,8 +255,8 @@ public void testLoadSchema() throws IOException { Assertions.assertEquals(name, schema.name()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); Schema schema1 = ops.loadSchema(NameIdentifierUtil.ofSchema("m1", "c1", name)); Assertions.assertEquals(name, schema1.name()); Assertions.assertEquals(comment, schema1.comment()); @@ -250,8 +279,8 @@ public void testListSchema() throws IOException { createSchema(name, comment, null, null); createSchema(name1, comment1, null, null); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); Set idents = Arrays.stream(ops.listSchemas(Namespace.of("m1", "c1"))).collect(Collectors.toSet()); Assertions.assertTrue(idents.size() >= 2); @@ -268,8 +297,8 @@ public void testAlterSchema() throws IOException { Schema schema = createSchema(name, comment, catalogPath, null); Assertions.assertEquals(name, schema.name()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); Schema schema1 = ops.loadSchema(NameIdentifierUtil.ofSchema("m1", "c1", name)); Assertions.assertEquals(name, schema1.name()); Assertions.assertEquals(comment, schema1.comment()); @@ -315,10 +344,10 @@ public void testDropSchema() throws IOException { Assertions.assertEquals(name, schema.name()); NameIdentifier id = NameIdentifierUtil.ofSchema("m1", "c1", name); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { ops.initialize( ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath), - null, + randomCatalogInfo("m1", "c1"), HADOOP_PROPERTIES_METADATA); Schema schema1 = ops.loadSchema(id); Assertions.assertEquals(name, schema1.name()); @@ -372,8 +401,8 @@ public void testCreateLoadAndDeleteFilesetWithLocations( } NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", schemaName); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(catalogProps, null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(catalogProps, randomCatalogInfo("m1", "c1"), HADOOP_PROPERTIES_METADATA); if (!ops.schemaExists(schemaIdent)) { createSchema(schemaName, comment, catalogPath, schemaPath); } @@ -427,8 +456,8 @@ public void testCreateFilesetWithExceptions() throws IOException { + " when it's catalog and schema " + "location are not set", exception.getMessage()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); Throwable e = Assertions.assertThrows( NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent)); @@ -443,8 +472,8 @@ public void testCreateFilesetWithExceptions() throws IOException { Assertions.assertEquals( "Storage location must be set for external fileset " + filesetIdent, exception1.getMessage()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); Throwable e = Assertions.assertThrows( NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent)); @@ -463,8 +492,8 @@ public void testListFilesets() throws IOException { createFileset(fileset, schemaName, comment, Fileset.Type.MANAGED, null, null); } - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); Set idents = Arrays.stream(ops.listFilesets(Namespace.of("m1", "c1", schemaName))) .collect(Collectors.toSet()); @@ -494,8 +523,8 @@ public void testRenameFileset( } NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", schemaName); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(catalogProps, null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(catalogProps, randomCatalogInfo("m1", "c1"), HADOOP_PROPERTIES_METADATA); if (!ops.schemaExists(schemaIdent)) { createSchema(schemaName, comment, catalogPath, schemaPath); } @@ -540,8 +569,8 @@ public void testAlterFilesetProperties() throws IOException { FilesetChange change1 = FilesetChange.setProperty("k1", "v1"); FilesetChange change2 = FilesetChange.removeProperty("k1"); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); Fileset fileset1 = ops.alterFileset(filesetIdent, change1); @@ -605,8 +634,8 @@ public void testUpdateFilesetComment() throws IOException { Fileset fileset = createFileset(name, schemaName, comment, Fileset.Type.MANAGED, null, null); FilesetChange change1 = FilesetChange.updateComment("comment26_new"); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(Maps.newHashMap(), null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA); NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); Fileset fileset1 = ops.alterFileset(filesetIdent, change1); @@ -857,8 +886,8 @@ private Schema createSchema(String name, String comment, String catalogPath, Str props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath); } - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(props, null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(props, randomCatalogInfo("m1", "c1"), HADOOP_PROPERTIES_METADATA); NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", name); Map schemaProps = Maps.newHashMap(); @@ -886,8 +915,8 @@ private Fileset createFileset( props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath); } - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { - ops.initialize(props, null, HADOOP_PROPERTIES_METADATA); + try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) { + ops.initialize(props, randomCatalogInfo("m1", "c1"), HADOOP_PROPERTIES_METADATA); NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); Map filesetProps = Maps.newHashMap(); diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java index 76bd402310c..6781d756e36 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java @@ -98,6 +98,7 @@ public void startHiveContainer() { .withNetwork(network); HiveContainer container = closer.register(hiveBuilder.build()); container.start(); + container.executeInContainer("hadoop", "fs", "-chown", "-R", "anonymous", "/user/"); hiveContainer = container; } } From 3fca806ace03a3b6a8e706a4f659d485d7990ca4 Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 1 Jul 2024 21:23:25 +0800 Subject: [PATCH 07/15] fix --- catalogs/catalog-hadoop/build.gradle.kts | 1 - .../hadoop/HadoopCatalogOperations.java | 120 ++++-------------- .../hadoop/SecureHadoopCatalogOperations.java | 103 +++++++++++++-- .../test/HadoopUserAuthenticationIT.java | 32 ++--- .../test/container/ContainerSuite.java | 4 +- 5 files changed, 131 insertions(+), 129 deletions(-) diff --git a/catalogs/catalog-hadoop/build.gradle.kts b/catalogs/catalog-hadoop/build.gradle.kts index 8b881bde943..41bf4de6dad 100644 --- a/catalogs/catalog-hadoop/build.gradle.kts +++ b/catalogs/catalog-hadoop/build.gradle.kts @@ -15,7 +15,6 @@ dependencies { implementation(project(":core")) implementation(project(":common")) - implementation(libs.cglib) implementation(libs.guava) implementation(libs.hadoop3.common) { exclude("com.sun.jersey") diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index 401414de00a..f3bd613dab0 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -121,13 +121,18 @@ static class UserInfo { UserGroupInformation loginUser; boolean enableUserImpersonation; String keytabPath; + String realm; static UserInfo of( - UserGroupInformation loginUser, boolean enableUserImpersonation, String keytabPath) { + UserGroupInformation loginUser, + boolean enableUserImpersonation, + String keytabPath, + String kerberosRealm) { UserInfo userInfo = new UserInfo(); userInfo.loginUser = loginUser; userInfo.enableUserImpersonation = enableUserImpersonation; userInfo.keytabPath = keytabPath; + userInfo.realm = kerberosRealm; return userInfo; } } @@ -165,20 +170,18 @@ private void initAuthentication(Map conf, Configuration hadoopCo AuthenticationConfig config = new AuthenticationConfig(conf); if (config.isKerberosAuth()) { - String catalogKeyTablePath = String.format(GRAVITINO_KEYTAB_FORMAT, catalogInfo.id()); this.kerberosRealm = initKerberos( - catalogKeyTablePath, - conf, - hadoopConf, - NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name())); + conf, hadoopConf, NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name())); } else if (config.isSimpleAuth()) { - // If the catalog is simple authentication, set api login user as the current user - UserGroupInformation u = - UserGroupInformation.createRemoteUser(PrincipalUtils.getCurrentUserName()); - userInfoMap.put( - NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name()), - UserInfo.of(u, false, null)); + // TODO: change the user 'datastrato' to 'anonymous' and uncomment the following code; + // uncomment the following code after the user 'datastrato' is removed from the codebase. + // for more, please see https://github.com/datastrato/gravitino/issues/4013 + // UserGroupInformation u = + // UserGroupInformation.createRemoteUser(PrincipalUtils.getCurrentUserName()); + // userInfoMap.put( + // NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name()), + // UserInfo.of(u, false, null, null)); } } @@ -260,8 +263,7 @@ public Fileset createFileset( } // Either catalog property "location", or schema property "location", or - // storageLocation must be - // set for managed fileset. + // storageLocation must be set for managed fileset. Path schemaPath = getSchemaPath(schemaIdent.name(), schemaEntity.properties()); if (schemaPath == null && StringUtils.isBlank(storageLocation)) { throw new IllegalArgumentException( @@ -373,7 +375,6 @@ public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes) @Override public boolean dropFileset(NameIdentifier ident) { - try { FilesetEntity filesetEntity = store.get(ident, Entity.EntityType.FILESET, FilesetEntity.class); @@ -393,14 +394,12 @@ public boolean dropFileset(NameIdentifier ident) { } } - boolean success = store.delete(ident, Entity.EntityType.FILESET); - cleanUserInfo(ident); - return success; + return store.delete(ident, Entity.EntityType.FILESET); } catch (NoSuchEntityException ne) { LOG.warn("Fileset {} does not exist", ident); return false; - } catch (IOException e) { - throw new RuntimeException("Failed to delete fileset " + ident, e); + } catch (IOException ioe) { + throw new RuntimeException("Failed to delete fileset " + ident, ioe); } } @@ -424,11 +423,11 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc * to use synchronized to ensure the thread safety: Make login and getLoginUser atomic. */ public synchronized String initKerberos( - String keytabPath, - Map properties, - Configuration configuration, - NameIdentifier ident) { + Map properties, Configuration configuration, NameIdentifier ident) { // Init schema level kerberos authentication. + String keytabPath = + String.format( + GRAVITINO_KEYTAB_FORMAT, catalogInfo.id() + "-" + ident.toString().replace(".", "-")); KerberosConfig kerberosConfig = new KerberosConfig(properties); if (kerberosConfig.isKerberosAuth()) { configuration.set( @@ -449,7 +448,8 @@ public synchronized String initKerberos( UserInfo.of( UserGroupInformation.getLoginUser(), kerberosConfig.isImpersonationEnabled(), - keytabPath)); + keytabPath, + kerberosRealm)); return kerberosRealm; } catch (IOException e) { throw new RuntimeException("Failed to login with Kerberos", e); @@ -606,7 +606,6 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty fs.delete(schemaPath, true); } - cleanUserInfo(ident); LOG.info("Deleted schema {} location {}", ident, schemaPath); return true; } catch (IOException ioe) { @@ -728,73 +727,4 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) { this.proxyPlugin = hadoopProxyPlugin; } - - UserGroupInformation getUGIByIdent(Map properties, NameIdentifier ident) { - KerberosConfig kerberosConfig = new KerberosConfig(properties); - if (kerberosConfig.isKerberosAuth()) { - // We assume that the realm of catalog is the same as the realm of the schema and table. - String keytabPath = - String.format( - GRAVITINO_KEYTAB_FORMAT, catalogInfo.id() + "-" + ident.toString().replace(".", "-")); - initKerberos(keytabPath, properties, new Configuration(), ident); - } - // If the kerberos is not enabled (Simple mode), we will use the current user - return getUserBaseOnNameIdentifier(ident); - } - - private UserGroupInformation getUserBaseOnNameIdentifier(NameIdentifier nameIdentifier) { - UserInfo userInfo = getNearestUserGroupInformation(nameIdentifier); - if (userInfo == null) { - throw new RuntimeException("Failed to get get current user: " + nameIdentifier); - } - - UserGroupInformation ugi = userInfo.loginUser; - boolean userImpersonation = userInfo.enableUserImpersonation; - if (userImpersonation) { - String proxyKerberosPrincipalName = PrincipalUtils.getCurrentUserName(); - if (!proxyKerberosPrincipalName.contains("@")) { - proxyKerberosPrincipalName = - String.format("%s@%s", proxyKerberosPrincipalName, kerberosRealm); - } - - ugi = UserGroupInformation.createProxyUser(proxyKerberosPrincipalName, ugi); - } - - return ugi; - } - - private UserInfo getNearestUserGroupInformation(NameIdentifier nameIdentifier) { - NameIdentifier currentNameIdentifier = nameIdentifier; - while (currentNameIdentifier != null) { - if (userInfoMap.containsKey(currentNameIdentifier)) { - return userInfoMap.get(currentNameIdentifier); - } - - String[] levels = currentNameIdentifier.namespace().levels(); - // The ident is catalog level. - if (levels.length <= 1) { - return null; - } - currentNameIdentifier = NameIdentifier.of(currentNameIdentifier.namespace().levels()); - } - return null; - } - - private void cleanUserInfo(NameIdentifier identifier) { - UserInfo userInfo = userInfoMap.remove(identifier); - if (userInfo != null) { - removeFile(userInfo.keytabPath); - } - } - - private void removeFile(String filePath) { - if (filePath == null) { - return; - } - - File file = new File(filePath); - if (file.exists()) { - file.delete(); - } - } } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java index f20a374c8d9..2c294dfba69 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java @@ -12,6 +12,7 @@ import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.SchemaChange; import com.datastrato.gravitino.catalog.hadoop.HadoopCatalogOperations.UserInfo; +import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig; import com.datastrato.gravitino.connector.CatalogInfo; import com.datastrato.gravitino.connector.CatalogOperations; import com.datastrato.gravitino.connector.HasPropertyMetadata; @@ -29,6 +30,7 @@ import com.datastrato.gravitino.meta.FilesetEntity; import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.utils.PrincipalUtils; +import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.UndeclaredThrowableException; @@ -37,6 +39,7 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +92,7 @@ public Fileset createFileset( String apiUser = PrincipalUtils.getCurrentUserName(); hadoopCatalogOperations.setCurrentUser(apiUser); - UserGroupInformation currentUser = hadoopCatalogOperations.getUGIByIdent(properties, ident); + UserGroupInformation currentUser = getUGIByIdent(properties, ident); try { return currentUser.doAs( (PrivilegedExceptionAction) @@ -129,12 +132,15 @@ public boolean dropFileset(NameIdentifier ident) { } // Reset the current user based on the name identifier. - UserGroupInformation currentUser = - hadoopCatalogOperations.getUGIByIdent(filesetEntity.properties(), ident); + UserGroupInformation currentUser = getUGIByIdent(filesetEntity.properties(), ident); try { - return currentUser.doAs( - (PrivilegedExceptionAction) () -> hadoopCatalogOperations.dropFileset(ident)); + boolean r = + currentUser.doAs( + (PrivilegedExceptionAction) + () -> hadoopCatalogOperations.dropFileset(ident)); + cleanUserInfo(ident); + return r; } catch (IOException | InterruptedException ioe) { throw new RuntimeException("Failed to create fileset " + ident, ioe); } catch (UndeclaredThrowableException e) { @@ -157,7 +163,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map) @@ -189,11 +195,15 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap()); // Reset the current user based on the name identifier. - UserGroupInformation user = hadoopCatalogOperations.getUGIByIdent(properties, ident); + UserGroupInformation user = getUGIByIdent(properties, ident); + + boolean r = + user.doAs( + (PrivilegedExceptionAction) + () -> hadoopCatalogOperations.dropSchema(ident, cascade)); + cleanUserInfo(ident); + return r; - return user.doAs( - (PrivilegedExceptionAction) - () -> hadoopCatalogOperations.dropSchema(ident, cascade)); } catch (IOException | InterruptedException ioe) { throw new RuntimeException("Failed to create fileset " + ident, ioe); } catch (UndeclaredThrowableException e) { @@ -268,4 +278,77 @@ public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException { public void close() throws IOException { hadoopCatalogOperations.close(); } + + private UserGroupInformation getUGIByIdent(Map properties, NameIdentifier ident) { + KerberosConfig kerberosConfig = new KerberosConfig(properties); + if (kerberosConfig.isKerberosAuth()) { + // We assume that the realm of catalog is the same as the realm of the schema and table. + hadoopCatalogOperations.initKerberos(properties, new Configuration(), ident); + } + // If the kerberos is not enabled (Simple mode), we will use the current user + return getUserBaseOnNameIdentifier(ident); + } + + private UserGroupInformation getUserBaseOnNameIdentifier(NameIdentifier nameIdentifier) { + UserInfo userInfo = getNearestUserGroupInformation(nameIdentifier); + if (userInfo == null) { + + // TODO(yuqi) comment the following code if the user in the docker hive image is the same as + // the user `anonymous` see: https://github.com/datastrato/gravitino/issues/4013 + try { + return UserGroupInformation.getLoginUser(); + } catch (IOException e) { + throw new RuntimeException("Failed to get login user", e); + } + } + + UserGroupInformation ugi = userInfo.loginUser; + boolean userImpersonation = userInfo.enableUserImpersonation; + if (userImpersonation) { + String proxyKerberosPrincipalName = PrincipalUtils.getCurrentUserName(); + if (!proxyKerberosPrincipalName.contains("@")) { + proxyKerberosPrincipalName = + String.format("%s@%s", proxyKerberosPrincipalName, userInfo.realm); + } + + ugi = UserGroupInformation.createProxyUser(proxyKerberosPrincipalName, ugi); + } + + return ugi; + } + + private UserInfo getNearestUserGroupInformation(NameIdentifier nameIdentifier) { + NameIdentifier currentNameIdentifier = nameIdentifier; + while (currentNameIdentifier != null) { + if (hadoopCatalogOperations.getUserInfoMap().containsKey(currentNameIdentifier)) { + return hadoopCatalogOperations.getUserInfoMap().get(currentNameIdentifier); + } + + String[] levels = currentNameIdentifier.namespace().levels(); + // The ident is catalog level. + if (levels.length <= 1) { + return null; + } + currentNameIdentifier = NameIdentifier.of(currentNameIdentifier.namespace().levels()); + } + return null; + } + + private void cleanUserInfo(NameIdentifier identifier) { + UserInfo userInfo = hadoopCatalogOperations.getUserInfoMap().remove(identifier); + if (userInfo != null) { + removeFile(userInfo.keytabPath); + } + } + + private void removeFile(String filePath) { + if (filePath == null) { + return; + } + + File file = new File(filePath); + if (file.exists()) { + file.delete(); + } + } } diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java index 71808df7838..b74c1e10fbd 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java @@ -373,7 +373,7 @@ void testCreateSchemaWithKerberos() { catalog .asFilesetCatalog() .createFileset( - NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, TABLE_NAME), + NameIdentifier.of(SCHEMA_NAME, TABLE_NAME), "comment", Fileset.Type.MANAGED, null, @@ -426,7 +426,7 @@ void createTableWithKerberos() { catalog .asFilesetCatalog() .createFileset( - NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, TABLE_NAME), + NameIdentifier.of(SCHEMA_NAME, TABLE_NAME), "comment", Fileset.Type.MANAGED, null, @@ -447,7 +447,7 @@ void createTableWithKerberos() { catalog .asFilesetCatalog() .createFileset( - NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset1), + NameIdentifier.of(SCHEMA_NAME, fileset1), "comment", Fileset.Type.MANAGED, null, @@ -464,7 +464,7 @@ void createTableWithKerberos() { catalog .asFilesetCatalog() .createFileset( - NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset1), + NameIdentifier.of(SCHEMA_NAME, fileset1), "comment", Fileset.Type.MANAGED, null, @@ -480,7 +480,7 @@ void createTableWithKerberos() { catalog .asFilesetCatalog() .createFileset( - NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset2), + NameIdentifier.of(SCHEMA_NAME, fileset2), "comment", Fileset.Type.MANAGED, null, @@ -497,7 +497,7 @@ void createTableWithKerberos() { catalog .asFilesetCatalog() .createFileset( - NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset2), + NameIdentifier.of(SCHEMA_NAME, fileset2), "comment", Fileset.Type.MANAGED, null, @@ -507,32 +507,20 @@ void createTableWithKerberos() { // cli_fileset. Assertions.assertThrows( Exception.class, - () -> - catalog - .asFilesetCatalog() - .dropFileset(NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset1))); + () -> catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME, fileset1))); kerberosHiveContainer.executeInContainer( "hadoop", "fs", "-chown", "-R", "cli_fileset", "/user/hadoop/" + catalogName); // Now try to drop fileset Assertions.assertDoesNotThrow( - () -> - catalog - .asFilesetCatalog() - .dropFileset(NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset1))); + () -> catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME, fileset1))); Assertions.assertThrows( Exception.class, - () -> - catalog - .asFilesetCatalog() - .dropFileset(NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset2))); + () -> catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME, fileset2))); kerberosHiveContainer.executeInContainer( "hadoop", "fs", "-chown", "-R", "gravitino_client", "/user/hadoop/" + catalogName); Assertions.assertDoesNotThrow( - () -> - catalog - .asFilesetCatalog() - .dropFileset(NameIdentifier.of(metalakeName, catalogName, SCHEMA_NAME, fileset2))); + () -> catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME, fileset2))); Assertions.assertThrows( Exception.class, () -> catalog.asSchemas().dropSchema(SCHEMA_NAME, true)); diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java index 6781d756e36..3191091ee7e 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java @@ -98,7 +98,9 @@ public void startHiveContainer() { .withNetwork(network); HiveContainer container = closer.register(hiveBuilder.build()); container.start(); - container.executeInContainer("hadoop", "fs", "-chown", "-R", "anonymous", "/user/"); + // Uncomment the following code if https://github.com/datastrato/gravitino/issues/4013 is + // done. + // container.executeInContainer("hadoop", "fs", "-chown", "-R", "anonymous", "/user/"); hiveContainer = container; } } From 0b84ab3dc2238537149beada59e78b5553123731 Mon Sep 17 00:00:00 2001 From: yuqi Date: Wed, 3 Jul 2024 16:42:22 +0800 Subject: [PATCH 08/15] merge main --- .../hadoop/SecureHadoopCatalogOperations.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java index 2c294dfba69..940838b9b84 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java @@ -1,6 +1,20 @@ /* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package com.datastrato.gravitino.catalog.hadoop; From 95e2f97d07b1d11602bbd938216d37b58e85eeeb Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 8 Jul 2024 20:03:35 +0800 Subject: [PATCH 09/15] Fix --- .../catalog/hadoop/HadoopCatalogOperations.java | 10 +++++----- .../catalog/hadoop/SecureHadoopCatalogOperations.java | 11 ++++++----- .../integration/test/container/ContainerSuite.java | 2 +- .../hadoop/GravitinoVirtualFileSystemIT.java | 4 ++++ 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index 8d2ef8c5d41..0cefb9e2820 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -191,11 +191,11 @@ private void initAuthentication(Map conf, Configuration hadoopCo // TODO: change the user 'datastrato' to 'anonymous' and uncomment the following code; // uncomment the following code after the user 'datastrato' is removed from the codebase. // for more, please see https://github.com/datastrato/gravitino/issues/4013 - // UserGroupInformation u = - // UserGroupInformation.createRemoteUser(PrincipalUtils.getCurrentUserName()); - // userInfoMap.put( - // NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name()), - // UserInfo.of(u, false, null, null)); + UserGroupInformation u = + UserGroupInformation.createRemoteUser(PrincipalUtils.getCurrentUserName()); + userInfoMap.put( + NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name()), + UserInfo.of(u, false, null, null)); } } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java index 940838b9b84..09ce2e31571 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java @@ -309,11 +309,12 @@ private UserGroupInformation getUserBaseOnNameIdentifier(NameIdentifier nameIden // TODO(yuqi) comment the following code if the user in the docker hive image is the same as // the user `anonymous` see: https://github.com/datastrato/gravitino/issues/4013 - try { - return UserGroupInformation.getLoginUser(); - } catch (IOException e) { - throw new RuntimeException("Failed to get login user", e); - } + // try { + // return UserGroupInformation.getLoginUser(); + // } catch (IOException e) { + // throw new RuntimeException("Failed to get login user", e); + // } + throw new RuntimeException("Failed to get user information for " + nameIdentifier); } UserGroupInformation ugi = userInfo.loginUser; diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java index 703e53cb255..ad240e3ab0c 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java @@ -114,7 +114,7 @@ public void startHiveContainer() { container.start(); // Uncomment the following code if https://github.com/datastrato/gravitino/issues/4013 is // done. - // container.executeInContainer("hadoop", "fs", "-chown", "-R", "anonymous", "/user/"); + container.executeInContainer("hadoop", "fs", "-chown", "-R", "anonymous", "/user/"); hiveContainer = container; } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/filesystem/hadoop/GravitinoVirtualFileSystemIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/filesystem/hadoop/GravitinoVirtualFileSystemIT.java index 154b61f0dab..10e44788d28 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/filesystem/hadoop/GravitinoVirtualFileSystemIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/filesystem/hadoop/GravitinoVirtualFileSystemIT.java @@ -66,6 +66,10 @@ public class GravitinoVirtualFileSystemIT extends AbstractIT { @BeforeAll public static void startUp() { containerSuite.startHiveContainer(); + containerSuite + .getHiveContainer() + .executeInContainer("hadoop", "fs", "-chown", "-R", "anonymous", "/"); + Assertions.assertFalse(client.metalakeExists(metalakeName)); metalake = client.createMetalake(metalakeName, "metalake comment", Collections.emptyMap()); Assertions.assertTrue(client.metalakeExists(metalakeName)); From 5ef2ced300956f6a7a9cd995c41d6b7e09b90697 Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 8 Jul 2024 20:41:27 +0800 Subject: [PATCH 10/15] Fix python test. --- .../tests/integration/hdfs_container.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/clients/client-python/tests/integration/hdfs_container.py b/clients/client-python/tests/integration/hdfs_container.py index 34bc41d8641..0882d9a0d1f 100644 --- a/clients/client-python/tests/integration/hdfs_container.py +++ b/clients/client-python/tests/integration/hdfs_container.py @@ -70,6 +70,17 @@ async def check_hdfs_container_status(hdfs_container): ) from e +def check_owner_to_anonymous(hdfs_container): + command_and_args = ["hadoop", "fs", "-chown", "-R", "anonymous", "/"] + exec_result = hdfs_container.exec_run(command_and_args) + if exec_result.exit_code != 0: + message = f"Command {command_and_args} exited with {exec_result.exit_code}" + logger.warning(message) + logger.warning("output: %s", exec_result.output) + return False + return True + + class HDFSContainer: _docker_client = None _container = None @@ -106,6 +117,11 @@ def __init__(self): self._fetch_ip() + if not check_owner_to_anonymous(self._container): + raise GravitinoRuntimeException( + "Failed to change the owner of the root directory to anonymous." + ) + def _create_networks(self): pool_config = tp.IPAMPool(subnet="10.20.31.16/28") ipam_config = tp.IPAMConfig(driver="default", pool_configs=[pool_config]) From 574365a21fe642eecf812916358f1fc9b2f4a702 Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 8 Jul 2024 21:04:00 +0800 Subject: [PATCH 11/15] Fix --- integration-test/trino-it/init/hive/init.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration-test/trino-it/init/hive/init.sh b/integration-test/trino-it/init/hive/init.sh index b909fff8658..2a214064898 100755 --- a/integration-test/trino-it/init/hive/init.sh +++ b/integration-test/trino-it/init/hive/init.sh @@ -22,3 +22,5 @@ IP=$(hostname -I | awk '{print $1}') sed -i "s|hdfs://localhost:9000|hdfs://${IP}:9000|g" /usr/local/hive/conf/hive-site.xml /bin/bash /usr/local/sbin/start.sh + +hdfs dfs -chown -R anonymous /user From e2f47bbce26ac7bf298e7751e0173deb869feaa9 Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 8 Jul 2024 21:25:41 +0800 Subject: [PATCH 12/15] Fix --- integration-test/trino-it/init/hive/init.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/trino-it/init/hive/init.sh b/integration-test/trino-it/init/hive/init.sh index 2a214064898..0faf677bb28 100755 --- a/integration-test/trino-it/init/hive/init.sh +++ b/integration-test/trino-it/init/hive/init.sh @@ -23,4 +23,4 @@ sed -i "s|hdfs://localhost:9000|hdfs://${IP}:9000|g" /usr/local/hi /bin/bash /usr/local/sbin/start.sh -hdfs dfs -chown -R anonymous /user +hdfs dfs -chown -R anonymous / From 5d085644711d2179b2a41bb8260f25856fa61fff Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 8 Jul 2024 21:53:06 +0800 Subject: [PATCH 13/15] Fix --- integration-test/trino-it/init/hive/init.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/trino-it/init/hive/init.sh b/integration-test/trino-it/init/hive/init.sh index 0faf677bb28..b02ffd4d575 100755 --- a/integration-test/trino-it/init/hive/init.sh +++ b/integration-test/trino-it/init/hive/init.sh @@ -23,4 +23,4 @@ sed -i "s|hdfs://localhost:9000|hdfs://${IP}:9000|g" /usr/local/hi /bin/bash /usr/local/sbin/start.sh -hdfs dfs -chown -R anonymous / +hadoop fs -chown -R anonymous /user \ No newline at end of file From 62dac4b955c1125d5f6d2444da3750783081f330 Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 8 Jul 2024 22:17:15 +0800 Subject: [PATCH 14/15] Fix --- integration-test/trino-it/init/hive/init.sh | 2 +- integration-test/trino-it/launch.sh | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/integration-test/trino-it/init/hive/init.sh b/integration-test/trino-it/init/hive/init.sh index b02ffd4d575..389e087303a 100755 --- a/integration-test/trino-it/init/hive/init.sh +++ b/integration-test/trino-it/init/hive/init.sh @@ -23,4 +23,4 @@ sed -i "s|hdfs://localhost:9000|hdfs://${IP}:9000|g" /usr/local/hi /bin/bash /usr/local/sbin/start.sh -hadoop fs -chown -R anonymous /user \ No newline at end of file +hadoop fs -chown -R anonymous / \ No newline at end of file diff --git a/integration-test/trino-it/launch.sh b/integration-test/trino-it/launch.sh index ca51922e281..6f1f2de90ba 100755 --- a/integration-test/trino-it/launch.sh +++ b/integration-test/trino-it/launch.sh @@ -76,4 +76,5 @@ echo "All docker compose service is now available." # change the hive container's logs directory permission docker exec trino-ci-hive chown -R `id -u`:`id -g` /tmp/root docker exec trino-ci-hive chown -R `id -u`:`id -g` /usr/local/hadoop/logs +docker exec trino-ci-hive hadoop fs -chown -R anonymous / From e8d304c25b816c13f8ba351544147823a57e521d Mon Sep 17 00:00:00 2001 From: yuqi Date: Mon, 8 Jul 2024 23:22:19 +0800 Subject: [PATCH 15/15] Fix --- .../integration/test/web/ui/CatalogsPageTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/web/ui/CatalogsPageTest.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/web/ui/CatalogsPageTest.java index 6746d34ec80..6609c26533e 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/web/ui/CatalogsPageTest.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/web/ui/CatalogsPageTest.java @@ -29,6 +29,8 @@ import com.datastrato.gravitino.integration.test.container.ContainerSuite; import com.datastrato.gravitino.integration.test.container.TrinoITContainers; import com.datastrato.gravitino.integration.test.util.AbstractIT; +import com.datastrato.gravitino.integration.test.util.CommandExecutor; +import com.datastrato.gravitino.integration.test.util.ProcessData; import com.datastrato.gravitino.integration.test.web.ui.pages.CatalogsPage; import com.datastrato.gravitino.integration.test.web.ui.pages.MetalakePage; import com.datastrato.gravitino.integration.test.web.ui.utils.AbstractWebIT; @@ -612,6 +614,12 @@ public void testSelectMetalake() throws InterruptedException { @Test @Order(21) public void testFilesetCatalogTreeNode() throws InterruptedException { + Object o = + CommandExecutor.executeCommandLocalHost( + "docker exec trino-ci-hive hadoop fs -chown -R anonymous /", + false, + ProcessData.TypesOfData.STREAMS_MERGED); + LOG.info("Command result: {}", o); // 1. create schema and fileset of fileset catalog createSchema(METALAKE_NAME, FILESET_CATALOG_NAME, SCHEMA_NAME_FILESET); createFileset(METALAKE_NAME, FILESET_CATALOG_NAME, SCHEMA_NAME_FILESET, FILESET_NAME);