Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.glue.catalog.util.AWSGlueConfig;
import com.amazonaws.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -39,13 +41,28 @@ public AWSCredentials getCredentials() {
String accessKey = StringUtils.trim(conf.get(AWSGlueConfig.AWS_GLUE_ACCESS_KEY));
String secretKey = StringUtils.trim(conf.get(AWSGlueConfig.AWS_GLUE_SECRET_KEY));
String sessionToken = StringUtils.trim(conf.get(AWSGlueConfig.AWS_GLUE_SESSION_TOKEN));
String roleArn = StringUtils.trim(conf.get(AWSGlueConfig.AWS_GLUE_ROLE_ARN));
String externalId = StringUtils.trim(conf.get(AWSGlueConfig.AWS_GLUE_EXTERNAL_ID));
if (!StringUtils.isNullOrEmpty(accessKey) && !StringUtils.isNullOrEmpty(secretKey)) {
return (StringUtils.isNullOrEmpty(sessionToken) ? new BasicAWSCredentials(accessKey,
secretKey) : new BasicSessionCredentials(accessKey, secretKey, sessionToken));
} else {
throw new SdkClientException(
"Unable to load AWS credentials from hive conf (aws.glue.access-key and aws.glue.secret-key)");
}

AWSCredentialsProvider longLivedProvider = new DefaultAWSCredentialsProviderChain();

if (!StringUtils.isNullOrEmpty(roleArn)) {
STSAssumeRoleSessionCredentialsProvider.Builder builder =
new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, "local-session")
.withLongLivedCredentialsProvider(longLivedProvider);

if (!StringUtils.isNullOrEmpty(externalId)) {
builder.withExternalId(externalId);
}
STSAssumeRoleSessionCredentialsProvider provider = builder.build();
return provider.getCredentials();
}
throw new SdkClientException("Unable to load AWS credentials from any provider in the chain");

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ private AWSGlueConfig() {
public static final String AWS_GLUE_ACCESS_KEY = "aws.glue.access-key";
public static final String AWS_GLUE_SECRET_KEY = "aws.glue.secret-key";
public static final String AWS_GLUE_SESSION_TOKEN = "aws.glue.session-token";
public static final String AWS_GLUE_ROLE_ARN = "aws.glue.role-arn";
public static final String AWS_GLUE_EXTERNAL_ID = "aws.glue.external-id";
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,57 @@ public ParamRules forbidIf(String a, String expectedValue, String[] forbiddenVal
return this;
}

/**
* Require that all values must either all exist or all be null/empty.
*
* @param values array of values to check
* @param errorMessage error message if the requirement is violated
* @return this ParamRules instance for chaining
*/
public ParamRules requireTogether(String[] values, String errorMessage) {
rules.add(() -> {
boolean anyPresent = false;
boolean allPresent = true;

for (String val : values) {
if (isPresent(val)) {
anyPresent = true;
} else {
allPresent = false;
}
}

if (anyPresent && !allPresent) {
throw new IllegalArgumentException(errorMessage);
}
});
return this;
}

/**
* Require that at least one value in the array is present (non-null and not empty).
*
* @param values array of values to check
* @param errorMessage error message if none of the values are present
* @return this ParamRules instance for chaining
*/
public ParamRules requireAtLeastOne(String[] values, String errorMessage) {
rules.add(() -> {
boolean anyPresent = false;
for (String val : values) {
if (isPresent(val)) {
anyPresent = true;
break;
}
}
if (!anyPresent) {
throw new IllegalArgumentException(errorMessage);
}
});
return this;
}


// --------- Utility Methods ----------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,11 @@ public class AWSGlueMetaStoreBaseProperties {
protected String glueSessionToken = "";

@ConnectorProperty(names = {"glue.role_arn"},
description = "The IAM role the AWS Glue.",
supported = false)
description = "The IAM role the AWS Glue.")
protected String glueIAMRole = "";

@ConnectorProperty(names = {"glue.external_id"},
description = "The external id of the AWS Glue.",
supported = false)
description = "The external id of the AWS Glue.")
protected String glueExternalId = "";

public static AWSGlueMetaStoreBaseProperties of(Map<String, String> properties) {
Expand Down Expand Up @@ -96,12 +94,12 @@ public static AWSGlueMetaStoreBaseProperties of(Map<String, String> properties)

private ParamRules buildRules() {

return new ParamRules()
.require(glueAccessKey,
"glue.access_key is required")
.require(glueSecretKey,
"glue.secret_key is required")
.require(glueEndpoint, "glue.endpoint is required");
return new ParamRules().requireTogether(new String[]{glueAccessKey, glueSecretKey},
"glue.access_key and glue.secret_key must be set together")
.requireAtLeastOne(new String[]{glueAccessKey, glueIAMRole},
"At least one of glue.access_key or glue.role_arn must be set")
.requireAtLeastOne(new String[]{glueEndpoint, glueRegion},
"At least one of glue.endpoint or glue.region must be set");
}

private void checkAndInit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.datasource.property.ConnectorProperty;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.glue.catalog.util.AWSGlueConfig;
import org.apache.hadoop.hive.conf.HiveConf;

import java.util.Map;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class HMSGlueMetaStoreProperties extends AbstractHMSProperties {
protected String awsGlueCatalogSeparator = "";

// ========== Constructor ==========

/**
* Constructs an instance with the given metastore type and original properties.
*
Expand All @@ -98,9 +100,6 @@ private void initHiveConf() {
hiveConf = new HiveConf();
hiveConf.set(AWS_GLUE_ENDPOINT_KEY, baseProperties.glueEndpoint);
hiveConf.set(AWS_REGION_KEY, baseProperties.glueRegion);
hiveConf.set(AWS_GLUE_SESSION_TOKEN_KEY, baseProperties.glueSessionToken);
hiveConf.set(AWS_GLUE_ACCESS_KEY_KEY, baseProperties.glueAccessKey);
hiveConf.set(AWS_GLUE_SECRET_KEY_KEY, baseProperties.glueSecretKey);
hiveConf.set(AWS_GLUE_MAX_RETRY_KEY, String.valueOf(awsGlueMaxErrorRetries));
hiveConf.set(AWS_GLUE_MAX_CONNECTIONS_KEY, String.valueOf(awsGlueMaxConnections));
hiveConf.set(AWS_GLUE_CONNECTION_TIMEOUT_KEY, String.valueOf(awsGlueConnectionTimeout));
Expand All @@ -109,6 +108,17 @@ private void initHiveConf() {
hiveConf.set(AWS_CATALOG_CREDENTIALS_PROVIDER_FACTORY_CLASS_KEY,
"com.amazonaws.glue.catalog.credentials.ConfigurationAWSCredentialsProviderFactory");
hiveConf.set("hive.metastore.type", "glue");
setHiveConfPropertiesIfNotNull(hiveConf, AWSGlueConfig.AWS_GLUE_ACCESS_KEY, baseProperties.glueAccessKey);
setHiveConfPropertiesIfNotNull(hiveConf, AWSGlueConfig.AWS_GLUE_SECRET_KEY, baseProperties.glueSecretKey);
setHiveConfPropertiesIfNotNull(hiveConf, AWSGlueConfig.AWS_GLUE_SESSION_TOKEN, baseProperties.glueSessionToken);
setHiveConfPropertiesIfNotNull(hiveConf, AWSGlueConfig.AWS_GLUE_ROLE_ARN, baseProperties.glueIAMRole);
setHiveConfPropertiesIfNotNull(hiveConf, AWSGlueConfig.AWS_GLUE_EXTERNAL_ID, baseProperties.glueExternalId);
}

private static void setHiveConfPropertiesIfNotNull(HiveConf hiveConf, String key, String value) {
if (value != null) {
hiveConf.set(key, value);
}
}

public HMSGlueMetaStoreProperties(Map<String, String> origProps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,31 @@ private void appendS3Props(Map<String, String> props) {

private void appendGlueProps(Map<String, String> props) {
props.put(AwsProperties.GLUE_CATALOG_ENDPOINT, glueProperties.glueEndpoint);
props.put("client.credentials-provider",
"com.amazonaws.glue.catalog.credentials.ConfigurationAWSCredentialsProvider2x");
props.put("client.credentials-provider.glue.access_key", glueProperties.glueAccessKey);
props.put("client.credentials-provider.glue.secret_key", glueProperties.glueSecretKey);
props.put("aws.catalog.credentials.provider.factory.class",
"com.amazonaws.glue.catalog.credentials.ConfigurationAWSCredentialsProviderFactory");

if (StringUtils.isNotBlank(glueProperties.glueAccessKey) && StringUtils
.isNotBlank(glueProperties.glueSecretKey)) {
props.put("client.credentials-provider",
"com.amazonaws.glue.catalog.credentials.ConfigurationAWSCredentialsProvider2x");
props.put("client.credentials-provider.glue.access_key", glueProperties.glueAccessKey);
props.put("client.credentials-provider.glue.secret_key", glueProperties.glueSecretKey);
props.put("aws.catalog.credentials.provider.factory.class",
"com.amazonaws.glue.catalog.credentials.ConfigurationAWSCredentialsProviderFactory");
if (StringUtils.isNotBlank(glueProperties.glueSessionToken)) {
props.put("client.credentials-provider.glue.session_token", glueProperties.glueSessionToken);
}
return;
}
//IAM Assume Role
if (StringUtils.isNotBlank(glueProperties.glueIAMRole)) {
props.put(AwsProperties.CLIENT_FACTORY,
"org.apache.iceberg.aws.AssumeRoleAwsClientFactory");
props.put("aws.region", glueProperties.glueRegion);

props.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, glueProperties.glueIAMRole);
props.put(AwsProperties.CLIENT_ASSUME_ROLE_REGION, glueProperties.glueRegion);
if (StringUtils.isNotBlank(glueProperties.glueExternalId)) {
props.put(AwsProperties.CLIENT_ASSUME_ROLE_EXTERNAL_ID, glueProperties.glueExternalId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,15 @@ private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
hadoopStorageConfig.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
hadoopStorageConfig.set("fs.s3a.endpoint", getEndpoint());
hadoopStorageConfig.set("fs.s3a.endpoint.region", getRegion());
hadoopStorageConfig.set("fs.s3a.access.key", getAccessKey());
hadoopStorageConfig.set("fs.s3a.secret.key", getSecretKey());
hadoopStorageConfig.set("fs.s3.impl.disable.cache", "true");
hadoopStorageConfig.set("fs.s3a.impl.disable.cache", "true");
if (StringUtils.isNotBlank(getAccessKey())) {
hadoopStorageConfig.set("fs.s3a.access.key", getAccessKey());
hadoopStorageConfig.set("fs.s3a.secret.key", getSecretKey());
if (StringUtils.isNotBlank(getSessionToken())) {
hadoopStorageConfig.set("fs.s3a.session.token", getSessionToken());
}
}
hadoopStorageConfig.set("fs.s3a.connection.maximum", getMaxConnections());
hadoopStorageConfig.set("fs.s3a.connection.request.timeout", getRequestTimeoutS());
hadoopStorageConfig.set("fs.s3a.connection.timeout", getConnectionTimeoutS());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ public class S3Properties extends AbstractS3CompatibleProperties {
description = "The sts region of S3.")
protected String s3StsRegion = "";

@ConnectorProperty(names = {"s3.role_arn", "AWS_ROLE_ARN"},
@ConnectorProperty(names = {"s3.role_arn", "AWS_ROLE_ARN", "glue.role_arn"},
required = false,
description = "The iam role of S3.")
protected String s3IAMRole = "";

@ConnectorProperty(names = {"s3.external_id", "AWS_EXTERNAL_ID"},
@ConnectorProperty(names = {"s3.external_id", "AWS_EXTERNAL_ID", "glue.external_id"},
required = false,
description = "The external id of S3.")
protected String s3ExternalId = "";
Expand Down Expand Up @@ -345,17 +345,17 @@ protected String getEndpointFromRegion() {

/**
* ===========================================
* NOTICE:
* This parameter is still used for Cloud-related features,
* although it is no longer recommended.
*
* Reason:
* - Cloud may access S3-compatible object storage via the S3 protocol.
* - The exact behavior has not yet been fully clarified.
*
* Therefore:
* - We cannot directly replace it with the new parameter.
* - This redundant parameter is temporarily kept for compatibility.
* NOTICE:
* This parameter is still used for Cloud-related features,
* although it is no longer recommended.
* <p>
* Reason:
* - Cloud may access S3-compatible object storage via the S3 protocol.
* - The exact behavior has not yet been fully clarified.
* <p>
* Therefore:
* - We cannot directly replace it with the new parameter.
* - This redundant parameter is temporarily kept for compatibility.
* ===========================================
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.doris.datasource.property;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;

Expand Down Expand Up @@ -115,4 +115,36 @@ void testValidateWithPrefix() {
() -> rules.validate("Config Error"));
Assertions.assertEquals("Config Error: Missing value", e.getMessage());
}

@Test
void testRequireTogether() {
ParamRules rules = new ParamRules()
.requireTogether(new String[]{"accessKey", ""}, "Both accessKey and secretKey are required together");
IllegalArgumentException e = Assertions.assertThrows(IllegalArgumentException.class,
() -> rules.validate());
Assertions.assertEquals("Both accessKey and secretKey are required together", e.getMessage());
ParamRules rightRule = new ParamRules()
.requireTogether(new String[]{"accessKey", "secretKey"}, "Both accessKey and secretKey are required together");
Assertions.assertDoesNotThrow(() -> rightRule.validate());
}

@Test
void testAtLeastOne() {
ParamRules rules = new ParamRules()
.requireAtLeastOne(new String[]{""}, "At least one of accessKey and iamrole is required");
IllegalArgumentException e = Assertions.assertThrows(IllegalArgumentException.class,
() -> rules.validate());
Assertions.assertEquals("At least one of accessKey and iamrole is required", e.getMessage());
ParamRules rightRule1 = new ParamRules()
.requireAtLeastOne(new String[]{"accessKey", "iamrole"}, "At least one of accessKey and iamrole is required");
Assertions.assertDoesNotThrow(() -> rightRule1.validate());
ParamRules rightRule2 = new ParamRules()
.requireAtLeastOne(new String[]{"accessKey", ""}, "At least one of accessKey and iamrole is required");
Assertions.assertDoesNotThrow(() -> rightRule2.validate());
ParamRules rightRule3 = new ParamRules()
.requireAtLeastOne(
new String[]{"", "iamrole"}, "At least one of accessKey and iamrole is required"
);
Assertions.assertDoesNotThrow(() -> rightRule3.validate());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void testMissingSecretKeyThrows() {
IllegalArgumentException.class,
() -> AWSGlueMetaStoreBaseProperties.of(props)
);
Assertions.assertTrue(ex.getMessage().contains("glue.secret_key is required"));
Assertions.assertTrue(ex.getMessage().contains("glue.access_key and glue.secret_key must be set together"));
}

@Test
Expand All @@ -82,7 +82,7 @@ void testMissingEndpointThrows() {
IllegalArgumentException.class,
() -> AWSGlueMetaStoreBaseProperties.of(props)
);
Assertions.assertTrue(ex.getMessage().contains("glue.endpoint is required"));
Assertions.assertTrue(ex.getMessage().contains("At least one of glue.endpoint or glue.region must be set"));
}

@Test
Expand All @@ -108,4 +108,14 @@ void testExtractRegionFailsWhenPatternMatchesButNoRegion() {
);
Assertions.assertTrue(ex.getMessage().contains("Invalid AWS Glue endpoint"));
}

@Test
void testIamRole() {
Map<String, String> props = baseValidProps();
props.remove("glue.access_key");
props.remove("glue.secret_key");
props.put("glue.role_arn", "arn:aws:iam::1001:role/doris-glue-role");
AWSGlueMetaStoreBaseProperties glueProps = AWSGlueMetaStoreBaseProperties.of(props);
Assertions.assertEquals("arn:aws:iam::1001:role/doris-glue-role", glueProps.glueIAMRole);
}
}
Loading