Skip to content

Commit

Permalink
[ServiceBus] add delay between infinite retry cycles (Azure#20316)
Browse files Browse the repository at this point in the history
* [ServiceBus] add delay between infinite retry cycles

Errors that are not retryable will be re-thrown out of the normal retry() call.
However, we don't have any delay before restarting the retry cycle.

This PR adds a delay before continuing the infinite retry cycles.

* Add a test and update changelog

also fixed the issue where we don't use `maxRetryDelayInMs` in Fixed retry
mode.

* Address CR feedback

* Update sdk/servicebus/service-bus/CHANGELOG.md

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

* Revert max delay limit in Fixed retry mode

as the setting only applies to Exponential retry mode.

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>
  • Loading branch information
jeremymeng and ramya-rao-a authored Feb 12, 2022
1 parent fcec6a5 commit d5e4636
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 1 deletion.
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 7.5.0 (2022-02-08)
## 7.5.0 (2022-02-14)

### Features Added

Expand All @@ -10,6 +10,7 @@
### Bugs Fixed

- The `processError` callback to `subscribe()` was previously called only for errors on setting up the receiver, errors on message settlement or message lock renewal and not for errors on AMQP link or session. This is now fixed. [PR #19189](https://github.com/Azure/azure-sdk-for-js/pull/19189)
- Fix an issue where we don't respect retry options before starting the next retry cycle when using the `subscribe()` method. [PR #20316](https://github.com/Azure/azure-sdk-for-js/pull/20316)

## 7.4.0 (2021-11-08)

Expand Down
55 changes: 55 additions & 0 deletions sdk/servicebus/service-bus/src/receivers/receiverCommon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ import {
import { DispositionStatusOptions } from "../core/managementClient";
import { ConnectionContext } from "../connectionContext";
import {
Constants,
ErrorNameConditionMapper,
delay,
retry,
RetryConfig,
RetryMode,
RetryOperationType,
RetryOptions,
} from "@azure/core-amqp";
Expand Down Expand Up @@ -279,6 +282,27 @@ export interface RetryForeverArgs<T> {
logPrefix: string;
}

/**
* Calculates delay between retries, in milliseconds.
*/
function calculateDelay(
attemptCount: number,
retryDelayInMs: number,
maxRetryDelayInMs: number,
mode: RetryMode
): number {
if (mode === RetryMode.Exponential) {
const boundedRandDelta =
retryDelayInMs * 0.8 +
Math.floor(Math.random() * (retryDelayInMs * 1.2 - retryDelayInMs * 0.8));

const incrementDelta = boundedRandDelta * (Math.pow(2, attemptCount) - 1);
return Math.min(incrementDelta, maxRetryDelayInMs);
}

return retryDelayInMs;
}

/**
* Retry infinitely until success, reporting in between retry attempts.
*
Expand All @@ -293,6 +317,19 @@ export async function retryForever<T>(
retryFn: typeof retry = retry
): Promise<T> {
let numRetryCycles = 0;
const config = args.retryConfig;
if (!config.retryOptions) {
config.retryOptions = {};
}
if (!config.retryOptions.retryDelayInMs || config.retryOptions.retryDelayInMs < 0) {
config.retryOptions.retryDelayInMs = Constants.defaultDelayBetweenOperationRetriesInMs;
}
if (!config.retryOptions.maxRetryDelayInMs || config.retryOptions.maxRetryDelayInMs < 0) {
config.retryOptions.maxRetryDelayInMs = Constants.defaultMaxDelayForExponentialRetryInMs;
}
if (!config.retryOptions.mode) {
config.retryOptions.mode = RetryMode.Fixed;
}

// The retries are broken up into cycles, giving the user some control over how often
// we actually attempt to retry.
Expand Down Expand Up @@ -325,6 +362,24 @@ export async function retryForever<T>(
args.retryConfig
);

const delayInMs = calculateDelay(
numRetryCycles,
config.retryOptions.retryDelayInMs,
config.retryOptions.maxRetryDelayInMs,
config.retryOptions.mode
);
logger.verbose(
"[%s] Sleeping for %d milliseconds for '%s'.",
config.connectionId,
delayInMs,
config.operationType
);
await delay<void>(
delayInMs,
config.abortSignal,
"Retry cycle has been cancelled by the user."
);

continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ describe("shared receiver code", () => {
},
connectionId: "id",
operationType: RetryOperationType.connection,
retryOptions: {
retryDelayInMs: 2000,
},
},
},
fakeRetry
Expand All @@ -290,6 +293,68 @@ describe("shared receiver code", () => {

assert.equal(numRetryCalls, 2 + 1);
});

it("respects retry options", async () => {
const errorMessages: string[] = [];
const errorCount = 3;
let numRetryCalls = 0;

const fakeRetry = async <T>(): Promise<T> => {
++numRetryCalls;

if (numRetryCalls < errorCount + 1) {
// force retry<> to get called ${errorCount} times (because
// we "failed" and threw exceptions and 1 more time where
// we succeed.
throw new Error(`Attempt ${numRetryCalls}: Force another call of retry<>`);
}

return Promise.resolve({} as T);
};

const retryDelayInMs = 2000;
let previousAttemptTime = Date.now();
await retryForever(
{
logPrefix: "logPrefix",
logger: logger,
onError: (err) => {
errorMessages.push(err.message);
if (numRetryCalls > 1) {
// not the first attempt
const currentTime = Date.now();
if (currentTime - previousAttemptTime < retryDelayInMs) {
errorMessages.push(
`Unexpected, Should've waited at least ${retryDelayInMs} between attempts`
);
}
previousAttemptTime = currentTime;
}
},
retryConfig: {
operation: async () => {
++numRetryCalls;

return 1;
},
connectionId: "id",
operationType: RetryOperationType.connection,
retryOptions: {
retryDelayInMs,
},
},
},
fakeRetry
);

assert.deepEqual(errorMessages, [
"Attempt 1: Force another call of retry<>",
"Attempt 2: Force another call of retry<>",
"Attempt 3: Force another call of retry<>",
]);

assert.equal(numRetryCalls, errorCount + 1);
});
});
});

Expand Down

0 comments on commit d5e4636

Please sign in to comment.