Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/multiple cred providers #111

Merged
merged 3 commits into from
Oct 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public class KinesisClientLibConfigurator {
// Required properties
private static final String PROP_APP_NAME = "applicationName";
private static final String PROP_STREAM_NAME = "streamName";
private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider";
private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider";
private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB";
private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch";
private static final String PROP_WORKER_ID = "workerId";

private Map<Class<?>, IPropertyValueDecoder<?>> classToDecoder;
Expand Down Expand Up @@ -107,7 +109,7 @@ public KinesisClientLibConfiguration getConfiguration(Properties properties) {
String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME));
String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME));
AWSCredentialsProvider provider =
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER));
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS));

if (applicationName == null || applicationName.isEmpty()) {
throw new IllegalArgumentException("Value of applicationName should be explicitly provided.");
Expand All @@ -116,6 +118,24 @@ public KinesisClientLibConfiguration getConfiguration(Properties properties) {
throw new IllegalArgumentException("Value of streamName should be explicitly provided.");
}

// Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider.
AWSCredentialsProvider providerDynamoDB;
String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB);
if (propCredentialsProviderDynamoDBValue == null) {
providerDynamoDB = provider;
} else {
providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue);
}

// Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider.
AWSCredentialsProvider providerCloudWatch;
String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH);
if (propCredentialsProviderCloudWatchValue == null) {
providerCloudWatch = provider;
} else {
providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue);
}

// Allow customer not to provide workerId or to provide empty worker id.
String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID));
if (workerId == null || workerId.isEmpty()) {
Expand All @@ -125,13 +145,13 @@ public KinesisClientLibConfiguration getConfiguration(Properties properties) {
}

KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId);
new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId);

Set<String> requiredNames =
new HashSet<String>(Arrays.asList(PROP_STREAM_NAME,
PROP_APP_NAME,
PROP_WORKER_ID,
PROP_CREDENTIALS_PROVIDER));
PROP_CREDENTIALS_PROVIDER_KINESIS));

// Set all the variables that are not used for constructor.
for (Object keyObject : properties.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public class KinesisClientLibConfiguratorTest {
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider";
private String credentialName2 =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider";
private String credentialNameKinesis =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis";
private String credentialNameDynamoDB =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB";
private String credentialNameCloudWatch =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch";
private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();

@Test
Expand Down Expand Up @@ -329,6 +335,74 @@ public void testWithAWSCredentialsFailed() {
}
}

@Test
public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatch() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test appears to succeed with the current code. It might make sense to test with different class names for each credential provider. The credential decoder has the ability to load custom class names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pfifer Yes my tests were terrible. Sorry about that. How are these?

String test = StringUtils.join(new String[] {
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = " + credentialNameKinesis,
"AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB,
"AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch,
"failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500"
}, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());

// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
KinesisClientLibConfiguration config = configurator.getConfiguration(input);
try {
config.getKinesisCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("Kinesis credential providers should not fail.");
}
try {
config.getDynamoDBCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("DynamoDB credential providers should not fail.");
}
try {
config.getCloudWatchCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("CloudWatch credential providers should not fail.");
}
}

@Test
public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatchFailed() {
String test = StringUtils.join(new String[] {
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = " + credentialNameKinesis,
"AWSCredentialsProviderDynamoDB = " + credentialName1,
"AWSCredentialsProviderCloudWatch = " + credentialName1,
"failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500"
}, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());

// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement

// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
KinesisClientLibConfiguration config = configurator.getConfiguration(input);
try {
config.getKinesisCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("Kinesis credential providers should not fail.");
}
try {
config.getDynamoDBCredentialsProvider().getCredentials();
fail("DynamoDB credential providers should fail.");
} catch (Exception e) {
// succeed
}
try {
config.getCloudWatchCredentialsProvider().getCredentials();
fail("CloudWatch credential providers should fail.");
} catch (Exception e) {
// succeed
}
}

/**
* This credentials provider will always succeed
*/
Expand All @@ -345,6 +419,84 @@ public void refresh() {

}

/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderKinesis implements AWSCredentialsProvider {

@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}

@Override
public String getAWSSecretKey() {
return "";
}
};
}

@Override
public void refresh() {
}

}

/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderDynamoDB implements AWSCredentialsProvider {

@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}

@Override
public String getAWSSecretKey() {
return "";
}
};
}

@Override
public void refresh() {
}

}

/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderCloudWatch implements AWSCredentialsProvider {

@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}

@Override
public String getAWSSecretKey() {
return "";
}
};
}

@Override
public void refresh() {
}

}

/**
* This credentials provider will always fail
*/
Expand Down