Skip to content

Commit

Permalink
Fix Retry middleware not releasing retry token (#1560)
Browse files Browse the repository at this point in the history
Updates the Retry middleware to release the retry token, on subsequent
attempts. This fixes #1413, and is based on PR #1424.
  • Loading branch information
jasdel authored Jan 14, 2022
1 parent e4d9a88 commit 4741932
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 125 deletions.
8 changes: 8 additions & 0 deletions .changelog/781710a7ecb24b9290b2642bd90b42c9.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "781710a7-ecb2-4b92-90b2-642bd90b42c9",
"type": "bugfix",
"description": "Updates the Retry middleware to release the retry token, on subsequent attempts. This fixes #1413, and is based on PR #1424",
"modules": [
"."
]
}
123 changes: 81 additions & 42 deletions aws/retry/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/aws/smithy-go/transport/http"
)

// RequestCloner is a function that can take an input request type and clone the request
// for use in a subsequent retry attempt
// RequestCloner is a function that can take an input request type and clone
// the request for use in a subsequent retry attempt.
type RequestCloner func(interface{}) interface{}

type retryMetadata struct {
Expand All @@ -27,11 +27,12 @@ type retryMetadata struct {
AttemptClockSkew time.Duration
}

// Attempt is a Smithy FinalizeMiddleware that handles retry attempts using the provided
// Retryer implementation
// Attempt is a Smithy Finalize middleware that handles retry attempts using
// the provided Retryer implementation.
type Attempt struct {
// Enable the logging of retry attempts performed by the SDK.
// This will include logging retry attempts, unretryable errors, and when max attempts are reached.
// Enable the logging of retry attempts performed by the SDK. This will
// include logging retry attempts, unretryable errors, and when max
// attempts are reached.
LogAttempts bool

retryer aws.Retryer
Expand Down Expand Up @@ -59,21 +60,24 @@ func (r Attempt) logf(logger logging.Logger, classification logging.Classificati
logger.Logf(classification, format, v...)
}

// HandleFinalize utilizes the provider Retryer implementation to attempt retries over the next handler
func (r Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
// HandleFinalize utilizes the provider Retryer implementation to attempt
// retries over the next handler
func (r *Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error,
) {
var attemptNum int
var attemptClockSkew time.Duration
var attemptResults AttemptResults

maxAttempts := r.retryer.MaxAttempts()
releaseRetryToken := nopRelease

for {
attemptNum++
attemptInput := in
attemptInput.Request = r.requestCloner(attemptInput.Request)

// Record the metadata for the for attempt being started.
attemptCtx := setRetryMetadata(ctx, retryMetadata{
AttemptNum: attemptNum,
AttemptTime: sdk.NowTime().UTC(),
Expand All @@ -82,23 +86,20 @@ func (r Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInp
})

var attemptResult AttemptResult
out, attemptResult, releaseRetryToken, err = r.handleAttempt(attemptCtx, attemptInput, releaseRetryToken, next)
attemptClockSkew, _ = awsmiddle.GetAttemptSkew(attemptResult.ResponseMetadata)

out, attemptResult, err = r.handleAttempt(attemptCtx, attemptInput, next)

var ok bool
attemptClockSkew, ok = awsmiddle.GetAttemptSkew(attemptResult.ResponseMetadata)
if !ok {
attemptClockSkew = 0
}

// AttempResult Retried states that the attempt was not successful, and
// should be retried.
shouldRetry := attemptResult.Retried

// add attempt metadata to list of all attempt metadata
// Add attempt metadata to list of all attempt metadata
attemptResults.Results = append(attemptResults.Results, attemptResult)

if !shouldRetry {
// Ensure the last response's metadata is used as the bases for result
// metadata returned by the stack.
// metadata returned by the stack. The Slice of attempt results
// will be added to this cloned metadata.
metadata = attemptResult.ResponseMetadata.Clone()

break
Expand All @@ -110,26 +111,36 @@ func (r Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInp
}

// handleAttempt handles an individual request attempt.
func (r Attempt) handleAttempt(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
out smithymiddle.FinalizeOutput, attemptResult AttemptResult, err error,
func (r *Attempt) handleAttempt(
ctx context.Context, in smithymiddle.FinalizeInput, releaseRetryToken func(error) error, next smithymiddle.FinalizeHandler,
) (
out smithymiddle.FinalizeOutput, attemptResult AttemptResult, _ func(error) error, err error,
) {
defer func() {
attemptResult.Err = err
}()

relRetryToken := r.retryer.GetInitialToken()
//------------------------------
// Get Initial (aka Send) Token
//------------------------------
releaseInitialToken := r.retryer.GetInitialToken()

//------------------------------
// Send Attempt
//------------------------------
logger := smithymiddle.GetLogger(ctx)
service, operation := awsmiddle.GetServiceID(ctx), awsmiddle.GetOperationName(ctx)

retryMetadata, _ := getRetryMetadata(ctx)
attemptNum := retryMetadata.AttemptNum
maxAttempts := retryMetadata.MaxAttempts

// Following attempts must ensure the request payload stream starts in a
// rewound state.
if attemptNum > 1 {
if rewindable, ok := in.Request.(interface{ RewindStream() error }); ok {
if rewindErr := rewindable.RewindStream(); rewindErr != nil {
err = fmt.Errorf("failed to rewind transport stream for retry, %w", rewindErr)
return out, attemptResult, err
return out, attemptResult, nopRelease, err
}
}

Expand All @@ -140,51 +151,78 @@ func (r Attempt) handleAttempt(ctx context.Context, in smithymiddle.FinalizeInpu
out, metadata, err = next.HandleFinalize(ctx, in)
attemptResult.ResponseMetadata = metadata

if releaseError := relRetryToken(err); releaseError != nil && err != nil {
err = fmt.Errorf("failed to release token after request error, %w", err)
return out, attemptResult, err
//------------------------------
// Bookkeeping
//------------------------------
// Release the initial send token based on the state of the attempt's error (if any).
if releaseError := releaseInitialToken(err); releaseError != nil && err != nil {
err = fmt.Errorf("failed to release initial token after request error, %w", err)
return out, attemptResult, nopRelease, err
}

// Release the retry token based on the state of the attempt's error (if any).
if releaseError := releaseRetryToken(err); releaseError != nil && err != nil {
err = fmt.Errorf("failed to release retry token after request error, %w", err)
return out, attemptResult, nopRelease, err
}
// If there was no error making the attempt, nothing further to do. There
// will be nothing to retry.
if err == nil {
return out, attemptResult, err
return out, attemptResult, nopRelease, err
}

//------------------------------
// Is Retryable and Should Retry
//------------------------------
// If the attempt failed with an unretryable error, nothing further to do
// but return, and inform the caller about the terminal failure.
retryable := r.retryer.IsErrorRetryable(err)
if !retryable {
r.logf(logger, logging.Debug, "request failed with unretryable error %v", err)
return out, attemptResult, err
return out, attemptResult, nopRelease, err
}

// set retryable to true
attemptResult.Retryable = true

// Once the maximum number of attempts have been exhausted there is nothing
// further to do other than inform the caller about the terminal failure.
if maxAttempts > 0 && attemptNum >= maxAttempts {
r.logf(logger, logging.Debug, "max retry attempts exhausted, max %d", maxAttempts)
err = &MaxAttemptsError{
Attempt: attemptNum,
Err: err,
}
return out, attemptResult, err
return out, attemptResult, nopRelease, err
}

relRetryToken, reqErr := r.retryer.GetRetryToken(ctx, err)
if reqErr != nil {
return out, attemptResult, reqErr
//------------------------------
// Get Retry (aka Retry Quota) Token
//------------------------------
// Get a retry token that will be released after the
releaseRetryToken, retryTokenErr := r.retryer.GetRetryToken(ctx, err)
if retryTokenErr != nil {
return out, attemptResult, nopRelease, retryTokenErr
}

//------------------------------
// Retry Delay and Sleep
//------------------------------
// Get the retry delay before another attempt can be made, and sleep for
// that time. Potentially early exist if the sleep is canceled via the
// context.
retryDelay, reqErr := r.retryer.RetryDelay(attemptNum, err)
if reqErr != nil {
return out, attemptResult, reqErr
return out, attemptResult, releaseRetryToken, reqErr
}

if reqErr = sdk.SleepWithContext(ctx, retryDelay); reqErr != nil {
err = &aws.RequestCanceledError{Err: reqErr}
return out, attemptResult, err
return out, attemptResult, releaseRetryToken, err
}

// The request should be re-attempted.
attemptResult.Retried = true

return out, attemptResult, err
return out, attemptResult, releaseRetryToken, err
}

// MetricsHeader attaches SDK request metric header for retries to the transport
Expand All @@ -195,7 +233,7 @@ func (r *MetricsHeader) ID() string {
return "RetryMetricsHeader"
}

// HandleFinalize attaches the sdk request metric header to the transport layer
// HandleFinalize attaches the SDK request metric header to the transport layer
func (r MetricsHeader) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error,
) {
Expand Down Expand Up @@ -251,13 +289,14 @@ func setRetryMetadata(ctx context.Context, metadata retryMetadata) context.Conte
return middleware.WithStackValue(ctx, retryMetadataKey{}, metadata)
}

// AddRetryMiddlewaresOptions is the set of options that can be passed to AddRetryMiddlewares for configuring retry
// associated middleware.
// AddRetryMiddlewaresOptions is the set of options that can be passed to
// AddRetryMiddlewares for configuring retry associated middleware.
type AddRetryMiddlewaresOptions struct {
Retryer aws.Retryer

// Enable the logging of retry attempts performed by the SDK.
// This will include logging retry attempts, unretryable errors, and when max attempts are reached.
// Enable the logging of retry attempts performed by the SDK. This will
// include logging retry attempts, unretryable errors, and when max
// attempts are reached.
LogRetryAttempts bool
}

Expand Down
Loading

0 comments on commit 4741932

Please sign in to comment.