Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Pass timeout to server when accepting next session #23210

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions sdk/servicebus/service-bus/src/core/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ export function onMessageSettled(
}
}

// Placed in Service Bus for now and can be promoted to core-amqp if also useful for Event Hubs in the future.
const timeoutName = `${Constants.vendorString}:timeout`;
/**
* Creates the options that need to be specified while creating an AMQP receiver link.
*
Expand All @@ -87,8 +89,13 @@ export function createReceiverOptions(
receiveMode: ReceiveMode,
source: Source,
clientId: string,
handlers: ReceiverHandlers
handlers: ReceiverHandlers,
timeoutInMs?: number
): ReceiverOptions {
const properties =
timeoutInMs !== undefined
? { [Constants.receiverIdentifierName]: clientId, [timeoutName]: timeoutInMs }
: { [Constants.receiverIdentifierName]: clientId };
const rcvrOptions: ReceiverOptions = {
name,
// "autoaccept" being true in the "receiveAndDelete" mode sets the "settled" flag to true on the deliveries
Expand All @@ -101,7 +108,7 @@ export function createReceiverOptions(
source,
target: clientId,
credit_window: 0,
properties: { [Constants.receiverIdentifierName]: clientId },
properties,
...handlers,
};

Expand Down
55 changes: 48 additions & 7 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,12 @@ export class MessageSession extends LinkEntity<Receiver> {
/**
* Creates a new AMQP receiver under a new AMQP session.
*/
private async _init(abortSignal?: AbortSignalLike): Promise<void> {
private async _init(
opts: { abortSignal?: AbortSignalLike; timeoutInMs?: number } = {}
): Promise<void> {
try {
const options = this._createMessageSessionOptions(this.identifier);
await this.initLink(options, abortSignal);
const sessionOptions = this._createMessageSessionOptions(this.identifier, opts.timeoutInMs);
await this.initLink(sessionOptions, opts.abortSignal);

if (this.link == null) {
throw new Error("INTERNAL ERROR: failed to create receiver but without an error.");
Expand Down Expand Up @@ -302,7 +304,11 @@ export class MessageSession extends LinkEntity<Receiver> {
this.sessionId,
this.sessionLockedUntilUtc.toISOString()
);
logger.verbose("%s Receiver created with receiver options: %O", this.logPrefix, options);
logger.verbose(
"%s Receiver created with receiver options: %O",
this.logPrefix,
sessionOptions
);
if (!this._context.messageSessions[this.name]) {
this._context.messageSessions[this.name] = this;
}
Expand All @@ -327,7 +333,7 @@ export class MessageSession extends LinkEntity<Receiver> {
/**
* Creates the options that need to be specified while creating an AMQP receiver link.
*/
private _createMessageSessionOptions(clientId: string): ReceiverOptions {
private _createMessageSessionOptions(clientId: string, timeoutInMs?: number): ReceiverOptions {
const rcvrOptions: ReceiverOptions = createReceiverOptions(
this.name,
this.receiveMode,
Expand All @@ -348,7 +354,8 @@ export class MessageSession extends LinkEntity<Receiver> {
onError: this._onAmqpError,
onSessionError: this._onSessionError,
onSettled: this._onSettled,
}
},
timeoutInMs
);

return rcvrOptions;
Expand Down Expand Up @@ -943,7 +950,41 @@ export class MessageSession extends LinkEntity<Receiver> {
): Promise<MessageSession> {
throwErrorIfConnectionClosed(context);
const messageSession = new MessageSession(identifier, context, entityPath, sessionId, options);
await messageSession._init(options?.abortSignal);
let timeoutInMs: number | undefined;
// Only passing client timeout in link properties for accepting next available
// session as this is the only long-polling scenario.
if (sessionId === undefined) {
timeoutInMs = options.retryOptions?.timeoutInMs ?? Constants.defaultOperationTimeoutInMs;
// The number of milliseconds to use as the basis for calculating a random jitter amount
// opening receiver links. This is intended to ensure that multiple
// session operations don't timeout at the same exact moment.
const openReceiveLinkBaseJitterInMs = 100;
// The amount of time to subtract from the client timeout when setting the server timeout when attempting to
// accept the next available session. This will decrease the likelihood that the client times out before receiving a
// response from the server.
const openReceiveLinkBufferInMs = 20;
// The amount minimum threshold for the server timeout for which we will subtract the "openReceiveLinkBufferInMs".
// If the server timeout is less than this, we will not subtract the additional buffer.
const openReceiveLinkBufferThresholdInMs = 1000;
// Subtract a random amount up to 100ms from the operation timeout as the jitter when attempting to open next available session link.
// This prevents excessive resource usage when using high amounts of concurrency and accepting the next available session.
// Take the min of 1% of the total timeout and the base jitter amount so that we don't end up subtracting more than 1% of the total timeout.
const jitterBaseInMs = Math.min(timeoutInMs * 0.01, openReceiveLinkBaseJitterInMs);
// We set the operation timeout on the properties not only to include the jitter, but also because the server will otherwise
// restrict the maximum timeout to 1 minute and 5 seconds, regardless of the client timeout. We only do this for accepting next available
// session as this is the only long-polling scenario.
timeoutInMs = Math.floor(timeoutInMs - jitterBaseInMs * Math.random());
// Subtract an additional constant buffer to reduce the likelihood that the client times out before the service which leads to unnecessary
// network traffic. If the timeout is too short, we won't do this.
if (timeoutInMs >= openReceiveLinkBufferThresholdInMs) {
timeoutInMs -= openReceiveLinkBufferInMs;
}
}

await messageSession._init({
abortSignal: options?.abortSignal,
timeoutInMs,
});
return messageSession;
}

Expand Down