Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Pass timeout to server when accepting next session #23210

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions sdk/servicebus/service-bus/src/core/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ export function onMessageSettled(
}
}

// Placed in Service Bus for now and can be promoted to core-amqp if also useful for Event Hubs in the future.
const timeoutName = `${Constants.vendorString}:timeout`;
/**
* Creates the options that need to be specified while creating an AMQP receiver link.
*
Expand All @@ -87,8 +89,13 @@ export function createReceiverOptions(
receiveMode: ReceiveMode,
source: Source,
clientId: string,
handlers: ReceiverHandlers
handlers: ReceiverHandlers,
timeoutInMs?: number
): ReceiverOptions {
const properties =
timeoutInMs !== undefined
? { [Constants.receiverIdentifierName]: clientId, [timeoutName]: timeoutInMs }
: { [Constants.receiverIdentifierName]: clientId };
const rcvrOptions: ReceiverOptions = {
name,
// "autoaccept" being true in the "receiveAndDelete" mode sets the "settled" flag to true on the deliveries
Expand All @@ -101,7 +108,7 @@ export function createReceiverOptions(
source,
target: clientId,
credit_window: 0,
properties: { [Constants.receiverIdentifierName]: clientId },
properties,
...handlers,
};

Expand Down
17 changes: 15 additions & 2 deletions sdk/servicebus/service-bus/src/serviceBusClient.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { ConnectionConfig } from "@azure/core-amqp";
import { ConnectionConfig, Constants } from "@azure/core-amqp";
import { TokenCredential, NamedKeyCredential, SASCredential } from "@azure/core-auth";
import {
ServiceBusClientOptions,
Expand Down Expand Up @@ -438,6 +438,19 @@ export class ServiceBusClient {
options3
);

const operationTimeout =
this._clientOptions?.retryOptions?.timeoutInMs ?? Constants.defaultOperationTimeoutInMs;
// The number of milliseconds to use as the basis for calculating a random jitter amount
// opening receiver links. This is intended to ensure that multiple
// session operations don't timeout at the same exact moment.
const jitterBaseInMs = Math.min(operationTimeout * 0.01, 100);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The jitter may not be as useful in JS since we don't have multiple threads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well there is still the concept of concurrency, no? It may all be occurring on one thread but I think spreading out the timeouts would still help alleviate context switching.

// Subtract a random amount up to 100ms from the operation timeout as the jitter when attempting to open next available session link.
// This prevents excessive resource usage when using high amounts of concurrency and accepting the next available session.
// Take the min of 1% of the total timeout and the base jitter amount so that we don't end up subtracting more than 1% of the total timeout.
const timeoutWithJitterInMs = operationTimeout - jitterBaseInMs * Math.random();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made an update to the .NET implementation to subtract an additional 20ms as long as the timeout is over 1 s. Azure/azure-sdk-for-net#31069

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I port this over too and add some fix so this really only applies to acceptNextSession()

// We set the operation timeout on the properties not only to include the jitter, but also because the server will otherwise
// restrict the maximum timeout to 1 minute and 5 seconds, regardless of the client timeout. We only do this for accepting next available
// session as this is the only long-polling scenario.
const messageSession = await MessageSession.create(
ensureValidIdentifier(entityPath, options?.identifier),
this._connectionContext,
Expand All @@ -447,7 +460,7 @@ export class ServiceBusClient {
maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs,
receiveMode,
abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions,
retryOptions: { ...this._clientOptions.retryOptions, timeoutInMs: timeoutWithJitterInMs },
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false,
}
);
Expand Down
24 changes: 17 additions & 7 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,12 @@ export class MessageSession extends LinkEntity<Receiver> {
/**
* Creates a new AMQP receiver under a new AMQP session.
*/
private async _init(abortSignal?: AbortSignalLike): Promise<void> {
private async _init(
opts: { abortSignal?: AbortSignalLike; timeoutInMs?: number } = {}
): Promise<void> {
try {
const options = this._createMessageSessionOptions(this.identifier);
await this.initLink(options, abortSignal);
const sessionOptions = this._createMessageSessionOptions(this.identifier, opts.timeoutInMs);
await this.initLink(sessionOptions, opts.abortSignal);

if (this.link == null) {
throw new Error("INTERNAL ERROR: failed to create receiver but without an error.");
Expand Down Expand Up @@ -302,7 +304,11 @@ export class MessageSession extends LinkEntity<Receiver> {
this.sessionId,
this.sessionLockedUntilUtc.toISOString()
);
logger.verbose("%s Receiver created with receiver options: %O", this.logPrefix, options);
logger.verbose(
"%s Receiver created with receiver options: %O",
this.logPrefix,
sessionOptions
);
if (!this._context.messageSessions[this.name]) {
this._context.messageSessions[this.name] = this;
}
Expand All @@ -327,7 +333,7 @@ export class MessageSession extends LinkEntity<Receiver> {
/**
* Creates the options that need to be specified while creating an AMQP receiver link.
*/
private _createMessageSessionOptions(clientId: string): ReceiverOptions {
private _createMessageSessionOptions(clientId: string, timeoutInMs?: number): ReceiverOptions {
const rcvrOptions: ReceiverOptions = createReceiverOptions(
this.name,
this.receiveMode,
Expand All @@ -348,7 +354,8 @@ export class MessageSession extends LinkEntity<Receiver> {
onError: this._onAmqpError,
onSessionError: this._onSessionError,
onSettled: this._onSettled,
}
},
timeoutInMs
);

return rcvrOptions;
Expand Down Expand Up @@ -943,7 +950,10 @@ export class MessageSession extends LinkEntity<Receiver> {
): Promise<MessageSession> {
throwErrorIfConnectionClosed(context);
const messageSession = new MessageSession(identifier, context, entityPath, sessionId, options);
await messageSession._init(options?.abortSignal);
await messageSession._init({
abortSignal: options?.abortSignal,
timeoutInMs: options.retryOptions?.timeoutInMs,
});
return messageSession;
}

Expand Down