Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST] Test authentication #4103

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ tasks.test {

doFirst {
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12")
environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-kerberos-hive:0.1.2")
environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-kerberos-hive:0.1.3")
}

val init = project.extra.get("initIntegrationTest") as (Test) -> Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public String shortName() {

@Override
protected CatalogOperations newOps(Map<String, String> config) {
HadoopCatalogOperations ops = new HadoopCatalogOperations();
CatalogOperations ops = new SecureHadoopCatalogOperations();
return ops;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosClient;
import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
import com.datastrato.gravitino.connector.CatalogInfo;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.HasPropertyMetadata;
Expand All @@ -53,7 +54,9 @@
import com.datastrato.gravitino.utils.PrincipalUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -96,6 +99,14 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem

private CatalogInfo catalogInfo;

private final List<Closeable> closeables = Lists.newArrayList();

private final Map<NameIdentifier, UserInfo> userInfoMap = Maps.newConcurrentMap();

public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-%s";

private final ThreadLocal<String> currentUser = ThreadLocal.withInitial(() -> null);

HadoopCatalogOperations(EntityStore store) {
this.store = store;
}
Expand All @@ -108,6 +119,38 @@ public String getKerberosRealm() {
return kerberosRealm;
}

public EntityStore getStore() {
return store;
}

public void setCurrentUser(String userName) {
currentUser.set(userName);
}

public Map<NameIdentifier, UserInfo> getUserInfoMap() {
return userInfoMap;
}

static class UserInfo {
UserGroupInformation loginUser;
boolean enableUserImpersonation;
String keytabPath;
String realm;

static UserInfo of(
UserGroupInformation loginUser,
boolean enableUserImpersonation,
String keytabPath,
String kerberosRealm) {
UserInfo userInfo = new UserInfo();
userInfo.loginUser = loginUser;
userInfo.enableUserImpersonation = enableUserImpersonation;
userInfo.keytabPath = keytabPath;
userInfo.realm = kerberosRealm;
return userInfo;
}
}

@Override
public void initialize(
Map<String, String> config, CatalogInfo info, HasPropertyMetadata propertiesMetadata)
Expand Down Expand Up @@ -139,20 +182,20 @@ public void initialize(

private void initAuthentication(Map<String, String> conf, Configuration hadoopConf) {
AuthenticationConfig config = new AuthenticationConfig(conf);
String authType = config.getAuthType();

if (StringUtils.equalsIgnoreCase(authType, AuthenticationMethod.KERBEROS.name())) {
hadoopConf.set(
HADOOP_SECURITY_AUTHENTICATION,
AuthenticationMethod.KERBEROS.name().toLowerCase(Locale.ROOT));
UserGroupInformation.setConfiguration(hadoopConf);
try {
KerberosClient kerberosClient = new KerberosClient(conf, hadoopConf);
File keytabFile = kerberosClient.saveKeyTabFileFromUri(catalogInfo.id());
this.kerberosRealm = kerberosClient.login(keytabFile.getAbsolutePath());
} catch (IOException e) {
throw new RuntimeException("Failed to login with Kerberos", e);
}
if (config.isKerberosAuth()) {
this.kerberosRealm =
initKerberos(
conf, hadoopConf, NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name()));
} else if (config.isSimpleAuth()) {
// TODO: change the user 'datastrato' to 'anonymous' and uncomment the following code;
// uncomment the following code after the user 'datastrato' is removed from the codebase.
// for more, please see https://github.com/datastrato/gravitino/issues/4013
UserGroupInformation u =
UserGroupInformation.createRemoteUser(PrincipalUtils.getCurrentUserName());
userInfoMap.put(
NameIdentifier.of(catalogInfo.namespace(), catalogInfo.name()),
UserInfo.of(u, false, null, null));
}
}

Expand Down Expand Up @@ -204,6 +247,11 @@ public Fileset createFileset(
String storageLocation,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {

String apiUser =
currentUser.get() == null ? PrincipalUtils.getCurrentUserName() : currentUser.get();

// Reset the current user based on the name identifier.
try {
if (store.exists(ident, Entity.EntityType.FILESET)) {
throw new FilesetAlreadyExistsException("Fileset %s already exists", ident);
Expand All @@ -228,8 +276,8 @@ public Fileset createFileset(
"Storage location must be set for external fileset " + ident);
}

// Either catalog property "location", or schema property "location", or storageLocation must be
// set for managed fileset.
// Either catalog property "location", or schema property "location", or
// storageLocation must be set for managed fileset.
Path schemaPath = getSchemaPath(schemaIdent.name(), schemaEntity.properties());
if (schemaPath == null && StringUtils.isBlank(storageLocation)) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -274,16 +322,14 @@ public Fileset createFileset(
.withNamespace(ident.namespace())
.withComment(comment)
.withFilesetType(type)
// Store the storageLocation to the store. If the "storageLocation" is null for
// Store the storageLocation to the store. If the "storageLocation" is null
// for
// managed fileset, Gravitino will get and store the location based on the
// catalog/schema's location and store it to the store.
.withStorageLocation(filesetPath.toString())
.withProperties(properties)
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
AuditInfo.builder().withCreator(apiUser).withCreateTime(Instant.now()).build())
.build();

try {
Expand Down Expand Up @@ -384,9 +430,56 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc
}
}

/**
* Get the UserGroupInformation based on the NameIdentifier and properties.
*
* <p>Note: As UserGroupInformation is a static class, to avoid the thread safety issue, we need
* to use synchronized to ensure the thread safety: Make login and getLoginUser atomic.
*/
public synchronized String initKerberos(
Map<String, String> properties, Configuration configuration, NameIdentifier ident) {
// Init schema level kerberos authentication.
String keytabPath =
String.format(
GRAVITINO_KEYTAB_FORMAT, catalogInfo.id() + "-" + ident.toString().replace(".", "-"));
KerberosConfig kerberosConfig = new KerberosConfig(properties);
if (kerberosConfig.isKerberosAuth()) {
configuration.set(
HADOOP_SECURITY_AUTHENTICATION,
AuthenticationMethod.KERBEROS.name().toLowerCase(Locale.ROOT));
UserGroupInformation.setConfiguration(configuration);

try {
KerberosClient kerberosClient = new KerberosClient(properties, configuration);
// Add the kerberos client to the closable to close resources.
closeables.add(kerberosClient);

File keytabFile = kerberosClient.saveKeyTabFileFromUri(keytabPath);
kerberosRealm = kerberosClient.login(keytabFile.getAbsolutePath());
// Should this kerberosRealm need to be equals to the realm in the principal?
userInfoMap.put(
ident,
UserInfo.of(
UserGroupInformation.getLoginUser(),
kerberosConfig.isImpersonationEnabled(),
keytabPath,
kerberosRealm));
return kerberosRealm;
} catch (IOException e) {
throw new RuntimeException("Failed to login with Kerberos", e);
}
}

return null;
}

@Override
public Schema createSchema(NameIdentifier ident, String comment, Map<String, String> properties)
throws NoSuchCatalogException, SchemaAlreadyExistsException {

String apiUser =
currentUser.get() == null ? PrincipalUtils.getCurrentUserName() : currentUser.get();

try {
if (store.exists(ident, Entity.EntityType.SCHEMA)) {
throw new SchemaAlreadyExistsException("Schema %s already exists", ident);
Expand Down Expand Up @@ -427,10 +520,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map<String, Str
.withComment(comment)
.withProperties(properties)
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
AuditInfo.builder().withCreator(apiUser).withCreateTime(Instant.now()).build())
.build();
try {
store.put(schemaEntity, true /* overwrite */);
Expand Down Expand Up @@ -532,14 +622,23 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty

LOG.info("Deleted schema {} location {}", ident, schemaPath);
return true;

} catch (IOException ioe) {
throw new RuntimeException("Failed to delete schema " + ident + " location", ioe);
}
}

@Override
public void close() throws IOException {}
public void close() throws IOException {
userInfoMap.clear();
closeables.forEach(
c -> {
try {
c.close();
} catch (IOException e) {
LOG.error("Failed to close resource", e);
}
});
}

private SchemaEntity updateSchemaEntity(
NameIdentifier ident, SchemaEntity schemaEntity, SchemaChange... changes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
*/
package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
import com.datastrato.gravitino.connector.BasePropertiesMetadata;
import com.datastrato.gravitino.connector.PropertyEntry;
import java.util.Collections;
import com.google.common.collect.ImmutableMap;
import java.util.Map;

public class HadoopFilesetPropertiesMetadata extends BasePropertiesMetadata {

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return Collections.emptyMap();
ImmutableMap.Builder<String, PropertyEntry<?>> builder = ImmutableMap.builder();
builder.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
builder.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.hadoop.security.UserGroupInformation;

public class HadoopProxyPlugin implements ProxyPlugin {
private HadoopCatalogOperations ops;
private UserGroupInformation realUser;
private SecureHadoopCatalogOperations ops;
private final UserGroupInformation realUser;

public HadoopProxyPlugin() {
try {
Expand Down Expand Up @@ -82,7 +82,7 @@ public Object doAs(

@Override
public void bindCatalogOperation(CatalogOperations ops) {
this.ops = ((HadoopCatalogOperations) ops);
this.ops = ((SecureHadoopCatalogOperations) ops);
this.ops.setProxyPlugin(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
import com.datastrato.gravitino.connector.BasePropertiesMetadata;
import com.datastrato.gravitino.connector.PropertyEntry;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -45,6 +47,8 @@ public class HadoopSchemaPropertiesMetadata extends BasePropertiesMetadata {
true /* immutable */,
null,
false /* hidden */))
.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
.build();

@Override
Expand Down
Loading
Loading