Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries if loading credentials fails with a SdkClientException #60

Merged
merged 10 commits into from
Mar 3, 2022
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,39 @@ The Default Credential Provider Chain must contain the permissions necessary to
For example, if the client is an EC2 instance, its instance profile should have permission to assume the
`msk_client_role`.

### Retries while getting credentials
In some scenarios the IAM credentials might be transiently unavailable. This will cause the connection to fail, which
might in some cases cause the client application to stop.
So, in version `1.1.3` the library retries loading the credentials when it gets an `SkdClientException` (which wraps
most `AWS SDK` client side exceptions). Since the retries do not impact the fault-free path and we had heard of user
issues around random failures loading credentials (e.g.: [#59](https://github.com/aws/aws-msk-iam-auth/issues/59), maybe
[#51](https://github.com/aws/aws-msk-iam-auth/issues/51) ), we decided to change the default behavior
to retry a maximum of `3` times. It exponentially backs off with full jitter upto a max-delay of `2000 ms`.

The maximum number of retries and the maximum back off period can be set:
```
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsMaxRetries="7" awsMaxBackOffTimeMs="500";
```
This sets the maximum number of retries to `7` and the maximum back off time to `500 ms`.

The retries can be turned off completely by setting `awsMaxRetries` to `"0"`.

## Troubleshooting

### IAMClientCallbackHandler could not be found

A Kafka client configured to use `AWS_MSK_IAM` may see an error that the `IAMClientCallbackHandler` cannot be found:

```
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value
software.amazon.msk.auth.iam.IAMClientCallbackHandler for configuration sasl.client.callback.handler.class:
Class software.amazon.msk.auth.iam.IAMClientCallbackHandler could not be found.
```

That means that this `aws-msk-iam-auth` library is not on the classpath of the Kafka client. Please add the `aws-msk-iam-auth` library
to the classpath and try again.


### Finding out which identity is being used

You may receive an `Access denied` error and there may be some doubt as to which credential is being exactly used. The
Expand Down Expand Up @@ -186,6 +217,12 @@ library is actually used by the Kafka producer and consumer clients and not the
it should be placed in a location that is on the classpath but outside the plugin path. This should ensure that Kafka
Connect's `PluginClassLoader` is not used to load classes for the `aws-msk-iam-auth` library.

### Dependency mismatch
If you are building the library from source using `gradle build` and copying it over to a Kafka client on that or
another machine, there is a chance that some dependencies may not be available on the Kafka client machine. In that
case, you could instead generate and use the uber jar that packages all the necessary runtime dependencies by
running `gradle shadowJar`.

## Details
This library introduces a new SASL mechanism called `AWS_MSK_IAM`. The `IAMLoginModule` is used to register the
`IAMSaslClientProvider` as a `Provider` for the `AWS_MSK_IAM` mechanism. The `IAMSaslClientProvider` is used to
Expand Down Expand Up @@ -432,6 +469,12 @@ public static String UriEncode(CharSequence input, boolean encodeSlash) {

## Release Notes

### Release 1.1.3

- Add retries if loading credential fails with client side errors.
- If AWS STS is not accessible for identifying the credential when `awsDebugCreds=true`, do not fail the connection.
- Update Troubleshooting section in README.

### Release 1.1.2

- Update log4j version in test dependencies to CVE-2021-44832
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package software.amazon.msk.auth.iam.internals;

import com.amazonaws.SdkBaseException;
import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
Expand All @@ -23,30 +25,44 @@
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
import com.amazonaws.retry.PredefinedBackoffStrategies;
import com.amazonaws.retry.v2.AndRetryCondition;
import com.amazonaws.retry.v2.MaxNumberOfRetriesCondition;
import com.amazonaws.retry.v2.RetryOnExceptionsCondition;
import com.amazonaws.retry.v2.RetryPolicy;
import com.amazonaws.retry.v2.RetryPolicyContext;
import com.amazonaws.retry.v2.SimpleRetryPolicy;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest;
import com.amazonaws.services.securitytoken.model.GetCallerIdentityResult;
import lombok.AccessLevel;
import lombok.Getter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;


/**
* This AWS Credential Provider is used to load up AWS Credentials based on options provided on the Jaas config line.
* As as an example
* sasl.jaas.config = IAMLoginModule required awsProfileName={profile name};
* The currently supported options are:
* 1. A particular AWS Credential profile: awsProfileName={profile name}
* 2. A particular AWS IAM Role and optional AWS IAM role session name:
* awsRoleArn={IAM Role ARN}, awsRoleSessionName = {session name}
* 3. If no options is provided, the DefaultAWSCredentialsProviderChain is used.
* 2. A particular AWS IAM Role and optionally AWS IAM role session name and AWS region for the STS endpoint:
* awsRoleArn={IAM Role ARN}, awsRoleSessionName={session name}, awsStsRegion={region name}
* 3. Optional arguments to configure retries when we fail to load credentials:
* awsMaxRetries={Maximum number of retries}, awsMaxBackOffTimeMs={Maximum back off time between retries in ms}
* 4. Optional argument to help debug credentials used to establish connections:
* awsDebugCreds={true|false}
* 5. If no options is provided, the DefaultAWSCredentialsProviderChain is used.
* The DefaultAWSCredentialProviderChain can be pointed to credentials in many different ways:
* <a href="https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html">Working with AWS Credentials</a>
*/
Expand All @@ -57,29 +73,49 @@ public class MSKCredentialProvider implements AWSCredentialsProvider, AutoClosea
private static final String AWS_ROLE_SESSION_KEY = "awsRoleSessionName";
private static final String AWS_STS_REGION = "awsStsRegion";
private static final String AWS_DEBUG_CREDS_KEY = "awsDebugCreds";
private static final String AWS_MAX_RETRIES = "awsMaxRetries";
private static final String AWS_MAX_BACK_OFF_TIME_MS = "awsMaxBackOffTimeMs";
private static final int DEFAULT_MAX_RETRIES = 3;
private static final int DEFAULT_MAX_BACK_OFF_TIME_MS = 5000;
private static final int BASE_DELAY = 500;

private final List<AutoCloseable> closeableProviders;
private final AWSCredentialsProvider compositeDelegate;
@Getter(AccessLevel.PACKAGE)
private final Boolean shouldDebugCreds;
private final String stsRegion;
private final RetryPolicy retryPolicy;

public MSKCredentialProvider(Map<String, ?> options) {
this(new ProviderBuilder(options));
}

MSKCredentialProvider(ProviderBuilder builder) {
this(builder.getProviders(), builder.shouldDebugCreds(), builder.getStsRegion());
this(builder.getProviders(), builder.shouldDebugCreds(), builder.getStsRegion(), builder.getMaxRetries(),
builder.getMaxBackOffTimeMs());
}

MSKCredentialProvider(List<AWSCredentialsProvider> providers, Boolean shouldDebugCreds, String stsRegion) {
MSKCredentialProvider(List<AWSCredentialsProvider> providers,
Boolean shouldDebugCreds,
String stsRegion,
int maxRetries,
int maxBackOffTimeMs) {
List<AWSCredentialsProvider> delegateList = new ArrayList<>(providers);
delegateList.add(getDefaultProvider());
compositeDelegate = new AWSCredentialsProviderChain(delegateList);
closeableProviders = providers.stream().filter(p -> p instanceof AutoCloseable).map(p -> (AutoCloseable) p)
.collect(Collectors.toList());
this.shouldDebugCreds = shouldDebugCreds;
this.stsRegion = stsRegion;
if (maxRetries > 0) {
this.retryPolicy = new SimpleRetryPolicy(
new AndRetryCondition(new RetryOnExceptionsCondition(Collections.singletonList(
SdkClientException.class)), new MaxNumberOfRetriesCondition(maxRetries)),
new PredefinedBackoffStrategies.FullJitterBackoffStrategy(BASE_DELAY, maxBackOffTimeMs));
} else {
this.retryPolicy = new SimpleRetryPolicy((c) -> false,
new PredefinedBackoffStrategies.FullJitterBackoffStrategy(BASE_DELAY, maxBackOffTimeMs));
}
}

//We want to override the ProfileCredentialsProvider with the EnhancedProfileCredentialsProvider
Expand All @@ -93,17 +129,63 @@ protected AWSCredentialsProviderChain getDefaultProvider() {

@Override
public AWSCredentials getCredentials() {
AWSCredentials credentials = compositeDelegate.getCredentials();
AWSCredentials credentials = loadCredentialsWithRetry();
if (credentials != null && shouldDebugCreds && log.isDebugEnabled()) {
logCallerIdentity(credentials);
}
return credentials;
}

private AWSCredentials loadCredentialsWithRetry() {
RetryPolicyContext retryPolicyContext = RetryPolicyContext.builder().build();
boolean shouldTry = true;
try {
while (shouldTry) {
try {
AWSCredentials credentials = compositeDelegate.getCredentials();
if (credentials == null) {
throw new SdkClientException("Composite delegate returned empty credentials.");
}
return credentials;
} catch (SdkBaseException se) {
log.warn("Exception loading credentials. Retry Attempts: {}",
retryPolicyContext.retriesAttempted(), se);
retryPolicyContext = createRetryPolicyContext(se, retryPolicyContext.retriesAttempted());
shouldTry = retryPolicy.shouldRetry(retryPolicyContext);
if (shouldTry) {
Thread.sleep(retryPolicy.computeDelayBeforeNextRetry(retryPolicyContext));
retryPolicyContext = createRetryPolicyContext(retryPolicyContext.exception(),
retryPolicyContext.retriesAttempted() + 1);
} else {
throw se;
}
}
}
throw new SdkClientException(
"loadCredentialsWithRetry in unexpected location " + retryPolicyContext.totalRequests(),
retryPolicyContext.exception());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for credentials.", ie);
}
}

private RetryPolicyContext createRetryPolicyContext(SdkBaseException sdkException, int retriesAttempted) {
return RetryPolicyContext.builder().exception(sdkException)
.retriesAttempted(retriesAttempted).build();
}

private void logCallerIdentity(AWSCredentials credentials) {
AWSSecurityTokenService stsClient = getStsClientForDebuggingCreds(credentials);
GetCallerIdentityResult response = stsClient.getCallerIdentity(new GetCallerIdentityRequest());
log.debug("The identity of the credentials is {}", response.toString());
try {
AWSSecurityTokenService stsClient = getStsClientForDebuggingCreds(credentials);
GetCallerIdentityResult response = stsClient.getCallerIdentity(new GetCallerIdentityRequest());
log.debug("The identity of the credentials is {}", response.toString());
} catch (Exception e) {
//If we run into an exception logging the caller identity, we should log the exception but
//continue running.
log.warn("Error identifying caller identity. If this is not transient, does this application have"
+ "access to AWS STS?", e);
}
}

AWSSecurityTokenService getStsClientForDebuggingCreds(AWSCredentials credentials) {
Expand Down Expand Up @@ -165,6 +247,17 @@ public String getStsRegion() {
.orElse("aws-global");
}

public int getMaxRetries() {
return Optional.ofNullable(optionsMap.get(AWS_MAX_RETRIES)).map(p -> (String) p).map(Integer::parseInt)
.orElse(DEFAULT_MAX_RETRIES);
}

public int getMaxBackOffTimeMs() {
return Optional.ofNullable(optionsMap.get(AWS_MAX_BACK_OFF_TIME_MS)).map(p -> (String) p)
.map(Integer::parseInt)
.orElse(DEFAULT_MAX_BACK_OFF_TIME_MS);
}

private Optional<EnhancedProfileCredentialsProvider> getProfileProvider() {
return Optional.ofNullable(optionsMap.get(AWS_PROFILE_NAME_KEY)).map(p -> {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -200,4 +293,5 @@ STSAssumeRoleSessionCredentialsProvider createSTSRoleCredentialProvider(String r
.build();
}
}

}
4 changes: 2 additions & 2 deletions src/main/resources/version.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#Updated on 2022-01-30T08:43:00Z
#Updated on 2022-02-20T20:43:00Z
platform=java
version=1.1.2
version=1.1.3-PRE
Loading