Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Adding in support for stopping a Subscription for session subscribe #9849

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b1b07b6
Adding in support for stopping a Subscription.
richardpark-msft Jun 30, 2020
28c5cb1
There is no 'we'!
richardpark-msft Jun 30, 2020
63704d9
Adding in some more logging for the critical points.
richardpark-msft Jun 30, 2020
fa8e5bc
Merge branch 'richardpark-sb-track2-subscriber-close' of https://gith…
richardpark-msft Jun 30, 2020
fc14de0
Adding a bit more protection when adding credits to the receiver.
richardpark-msft Jun 30, 2020
e7bf7f4
Remove commented line.
richardpark-msft Jun 30, 2020
4093760
Add in Harsha's edit for log message.
richardpark-msft Jun 30, 2020
327d1fd
We can cleanup ALL the instances of using .addCredit()!
richardpark-msft Jun 30, 2020
049f9b5
Merge branch 'richardpark-sb-track2-subscriber-close' of https://gith…
richardpark-msft Jun 30, 2020
02f0a0c
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jun 30, 2020
3818251
Remove comment - I _think_ we're okay with the this._receiver check o…
richardpark-msft Jun 30, 2020
3b008e5
Some new tests were added that don't properly clean up after themselves.
richardpark-msft Jul 1, 2020
4843f51
- Making it so session receivers can also close() a Subscription.
richardpark-msft Jul 1, 2020
39b1d53
Adding in changelog entry
richardpark-msft Jul 1, 2020
e3dd502
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jul 1, 2020
9fcd6de
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jul 2, 2020
e68f4c6
Add returns to the addCredit method to explain what the boolean means.
richardpark-msft Jul 2, 2020
fa4e1fa
Added @returns to the subscribe method to describe what the close() m…
richardpark-msft Jul 2, 2020
2b64f9e
Save some money on the bottom line and eliminate unneeded semi-colon …
richardpark-msft Jul 2, 2020
cd9bb47
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jul 2, 2020
cde1726
Remove duplicate function.
richardpark-msft Jul 2, 2020
259cae7
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jul 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
- Adds abortSignal support throughout Sender and non-session Receivers.
[PR 9233](https://github.com/Azure/azure-sdk-for-js/pull/9233)
[PR 9284](https://github.com/Azure/azure-sdk-for-js/pull/9284)
- (Receiver|SessionReceiver).subscribe() now returns a closeable object which will stop new messages from arriving
but still leave the receiver open so they can be settled via methods like complete().
[PR 9802](https://github.com/Azure/azure-sdk-for-js/pull/9802)
[PR 9849](https://github.com/Azure/azure-sdk-for-js/pull/9849)
- Standardized methods on senders and receivers to use the `Messages` suffix and deal with multiple messages rather than
have dedicated methods to deal with a single message. [PR 9678](https://github.com/Azure/azure-sdk-for-js/pull/9678)
- Standardized methods that peek and receive given number messages to use similar signature.
[PR 9798](https://github.com/Azure/azure-sdk-for-js/pull/9798)
have dedicated methods to deal with a single message. [PR 9678](https://github.com/Azure/azure-sdk-for-js/pull/9678)
- Standardized methods that peek and receive given number messages to use similar signature.
[PR 9798](https://github.com/Azure/azure-sdk-for-js/pull/9798)
- Bug - Messages scheduled in parallel with the `scheduleMessage` method have the same sequence number in response.
Fixed in [PR 9503](https://github.com/Azure/azure-sdk-for-js/pull/9503)
- Management api updates (Includes breaking changes)
Expand Down
4 changes: 3 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ export interface Receiver<ReceivedMessageT> {
receiveDeferredMessages(sequenceNumbers: Long | Long[], options?: OperationOptions): Promise<ReceivedMessageT[]>;
receiveMessages(maxMessageCount: number, options?: ReceiveMessagesOptions): Promise<ReceivedMessageT[]>;
receiveMode: "peekLock" | "receiveAndDelete";
subscribe(handlers: MessageHandlers<ReceivedMessageT>, options?: SubscribeOptions): void;
subscribe(handlers: MessageHandlers<ReceivedMessageT>, options?: SubscribeOptions): {
close(): Promise<void>;
};
}

// @public
Expand Down
95 changes: 90 additions & 5 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import {
OnAmqpEvent,
Receiver,
ReceiverOptions,
isAmqpError
isAmqpError,
ReceiverEvents
} from "rhea-promise";
import * as log from "../log";
import { LinkEntity } from "./linkEntity";
Expand Down Expand Up @@ -246,16 +247,24 @@ export class MessageReceiver extends LinkEntity {
*/
private _isDetaching: boolean = false;

public get receiverHelper(): ReceiverHelper {
return this._receiverHelper;
}
private _receiverHelper: ReceiverHelper;

constructor(context: ClientEntityContext, receiverType: ReceiverType, options?: ReceiveOptions) {
super(context.entityPath, context, {
address: context.entityPath,
audience: `${context.namespace.config.endpoint}${context.entityPath}`
});

if (!options) options = {};
this._retryOptions = options.retryOptions || {};
this.wasCloseInitiated = false;
this.receiverType = receiverType;
this.receiveMode = options.receiveMode || ReceiveMode.peekLock;
this._receiverHelper = new ReceiverHelper(() => this._receiver);

if (typeof options.maxConcurrentCalls === "number" && options.maxConcurrentCalls > 0) {
this.maxConcurrentCalls = options.maxConcurrentCalls;
}
Expand Down Expand Up @@ -513,9 +522,7 @@ export class MessageReceiver extends LinkEntity {
}
return;
} finally {
if (this._receiver) {
this._receiver.addCredit(1);
}
this._receiverHelper.addCredit(1);
}

// If we've made it this far, then user's message handler completed fine. Let us try
Expand Down Expand Up @@ -982,7 +989,7 @@ export class MessageReceiver extends LinkEntity {
await this.close();
} else {
if (this._receiver && this.receiverType === ReceiverType.streaming) {
this._receiver.addCredit(this.maxConcurrentCalls);
this._receiverHelper.addCredit(this.maxConcurrentCalls);
}
}
return;
Expand Down Expand Up @@ -1138,3 +1145,81 @@ export class MessageReceiver extends LinkEntity {
return result;
}
}

/**
* Wraps the receiver with some higher level operations for managing state
* like credits, draining, etc...
*
* @internal
* @ignore
*/
export class ReceiverHelper {
private _stopReceivingMessages: boolean = false;

constructor(private _getCurrentReceiver: () => Receiver | undefined) {}

/**
* Adds credits to the receiver, respecting any state that
* indicates the receiver is closed or should not continue
* to receive more messages.
*
* @param credits Number of credits to add.
*/
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
public addCredit(credits: number): boolean {
const receiver = this._getCurrentReceiver();

if (this._stopReceivingMessages || receiver == null) {
return false;
}

receiver.addCredit(credits);
return true;
}

/**
* Prevents us from receiving any further messages.
*/
public async stopReceivingMessages(): Promise<void> {
const receiver = this._getCurrentReceiver();

if (receiver == null) {
return;
}

log.receiver(
`[${receiver.name}] User has requested to stop receiving new messages, attempting to drain the credits.`
);
this._stopReceivingMessages = true;

return this.drain();
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Initiates a drain for the current receiver and resolves when
* the drain has completed.
*/
public async drain(): Promise<void> {
const receiver = this._getCurrentReceiver();

if (receiver == null) {
return;
}

log.receiver(`[${receiver.name}] Receiver is starting drain.`);

const drainPromise = new Promise<void>((resolve) => {
receiver.once(ReceiverEvents.receiverDrained, () => {
log.receiver(`[${receiver.name}] Receiver has been drained.`);
receiver.drain = false;
resolve();
});

receiver.drain = true;
// this is not actually adding another credit - it'll just
// cause the drain call to start.
receiver.addCredit(1);
});

return drainPromise;
}
}
5 changes: 1 addition & 4 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ export class StreamingReceiver extends MessageReceiver {
throwErrorIfConnectionClosed(this._context.namespace);
this._onMessage = onMessage;
this._onError = onError;

if (this._receiver) {
this._receiver.addCredit(this.maxConcurrentCalls);
}
this.receiverHelper.addCredit(this.maxConcurrentCalls);
}

/**
Expand Down
23 changes: 21 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,15 @@ export interface Receiver<ReceivedMessageT> {
* @param handlers A handler that gets called for messages and errors.
* @param options Options for subscribe.
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
*/
subscribe(handlers: MessageHandlers<ReceivedMessageT>, options?: SubscribeOptions): void;
subscribe(
handlers: MessageHandlers<ReceivedMessageT>,
options?: SubscribeOptions
): {
/**
* Causes the subscriber to stop receiving new messages.
*/
close(): Promise<void>;
};

/**
* Returns an iterator that can be used to receive messages from Service Bus.
Expand Down Expand Up @@ -381,7 +389,12 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
return retry<ReceivedMessage[]>(config);
}

subscribe(handlers: MessageHandlers<ReceivedMessageT>, options?: SubscribeOptions): void {
subscribe(
handlers: MessageHandlers<ReceivedMessageT>,
options?: SubscribeOptions
): {
close(): Promise<void>;
} {
assertValidMessageHandlers(handlers);

const processError = wrapProcessErrorHandler(handlers);
Expand All @@ -393,6 +406,12 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
processError,
options
);

return {
close: async (): Promise<void> => {
return this._context.streamingReceiver?.receiverHelper.stopReceivingMessages();
}
};
}

async close(): Promise<void> {
Expand Down
13 changes: 12 additions & 1 deletion sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,12 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
return retry<ReceivedMessageT[]>(config);
}

subscribe(handlers: MessageHandlers<ReceivedMessageT>, options?: SubscribeOptions): void {
subscribe(
handlers: MessageHandlers<ReceivedMessageT>,
options?: SubscribeOptions
): {
close(): Promise<void>;
} {
// TODO - receiverOptions for subscribe??
assertValidMessageHandlers(handlers);

Expand All @@ -446,6 +451,12 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
processError,
options
);

return {
close: async (): Promise<void> => {
return this._messageSession?.receiverHelper.stopReceivingMessages();
}
};
}

/**
Expand Down
28 changes: 19 additions & 9 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ import {
isAmqpError
} from "rhea-promise";
import * as log from "../log";
import { OnAmqpEventAsPromise, OnError, OnMessage, PromiseLike } from "../core/messageReceiver";
import {
OnAmqpEventAsPromise,
OnError,
OnMessage,
PromiseLike,
ReceiverHelper
} from "../core/messageReceiver";
import { LinkEntity } from "../core/linkEntity";
import { ClientEntityContext } from "../clientEntityContext";
import { calculateRenewAfterDuration, convertTicksToDate } from "../util/utils";
Expand Down Expand Up @@ -250,6 +256,11 @@ export class MessageSession extends LinkEntity {

private _totalAutoLockRenewDuration: number;

public get receiverHelper(): ReceiverHelper {
return this._receiverHelper;
}
private _receiverHelper: ReceiverHelper;

/**
* Ensures that the session lock is renewed before it expires. The lock will not be renewed for
* more than the configured totalAutoLockRenewDuration.
Expand Down Expand Up @@ -364,15 +375,15 @@ export class MessageSession extends LinkEntity {
// SB allows a sessionId with empty string value :)

if (this.sessionId == null && receivedSessionId == null) {
// Ideally this code path should never be reached as `createReceiver()` should fail instead
// Ideally this code path should never be reached as `createReceiver()` should fail instead
// TODO: https://github.com/Azure/azure-sdk-for-js/issues/9775 to figure out why this code path indeed gets hit.
errorMessage = `No unlocked sessions were available`;
} else if (this.sessionId != null && receivedSessionId !== this.sessionId) {
// This code path is reached if the session is already locked by another receiver.
// TODO: Check why the service would not throw an error or just timeout instead of giving a misleading successful receiver
errorMessage = `Failed to get a lock on the session ${this.sessionId};`
errorMessage = `Failed to get a lock on the session ${this.sessionId};`;
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
}

if (errorMessage) {
const error = translate({
description: errorMessage,
Expand Down Expand Up @@ -476,6 +487,7 @@ export class MessageSession extends LinkEntity {
});
this._context.isSessionEnabled = true;
this.isReceivingMessages = false;
this._receiverHelper = new ReceiverHelper(() => this._receiver);
if (!options) options = { sessionId: undefined };
this.autoComplete = false;
this.sessionId = options.sessionId;
Expand Down Expand Up @@ -846,9 +858,7 @@ export class MessageSession extends LinkEntity {
}
return;
} finally {
if (this._receiver) {
this._receiver!.addCredit(1);
}
this._receiverHelper.addCredit(1);
}

// If we've made it this far, then user's message handler completed fine. Let us try
Expand Down Expand Up @@ -883,7 +893,7 @@ export class MessageSession extends LinkEntity {
// setting the "message" event listener.
this._receiver.on(ReceiverEvents.message, onSessionMessage);
// adding credit
this._receiver!.addCredit(this.maxConcurrentCalls);
this._receiverHelper.addCredit(this.maxConcurrentCalls);
} else {
this.isReceivingMessages = false;
const msg =
Expand Down Expand Up @@ -1077,7 +1087,7 @@ export class MessageSession extends LinkEntity {
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// (complete,/abandon/defer/deadletter) the messages from the array.
this._receiver!.addCredit(maxMessageCount);
this._receiverHelper.addCredit(maxMessageCount);
let msg: string = "[%s] Setting the wait timer for %d milliseconds for receiver '%s'.";
if (reuse) msg += " Receiver link already present, hence reusing it.";
log.batching(msg, this._context.namespace.connectionId, maxWaitTimeInMs, this.name);
Expand Down
30 changes: 25 additions & 5 deletions sdk/servicebus/service-bus/test/batchReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,48 @@ describe("batchReceiver", () => {
// @ts-expect-error
const peekedMsgs = await receiver.peekMessages();
should.equal(peekedMsgs.length, 1, "Unexpected number of messages peeked.");
should.equal(peekedMsgs[0].body, testMessage.body, "Peeked message body is different than expected");
should.equal(
peekedMsgs[0].body,
testMessage.body,
"Peeked message body is different than expected"
);

// @ts-expect-error
const msgs = await receiver.receiveMessages();
should.equal(msgs.length, 1, "Unexpected number of messages received.");
should.equal(msgs[0].body, testMessage.body, "Received message body is different than expected");
should.equal(
msgs[0].body,
testMessage.body,
"Received message body is different than expected"
);
await msgs[0].complete();
});

it("Partitioned Queue with Sessions- maxMessageCount defaults to 1", async function(): Promise<void> {
it("Partitioned Queue with Sessions- maxMessageCount defaults to 1", async function(): Promise<
void
> {
await beforeEachTest(TestClientType.PartitionedQueueWithSessions);
const testMessage = TestMessage.getSessionSample();
await sender.sendMessages(testMessage);

// @ts-expect-error
const peekedMsgs = await receiver.peekMessages();
should.equal(peekedMsgs.length, 1, "Unexpected number of messages peeked.");
should.equal(peekedMsgs[0].body, testMessage.body, "Peeked message body is different than expected");
should.equal(
peekedMsgs[0].body,
testMessage.body,
"Peeked message body is different than expected"
);

// @ts-expect-error
const msgs = await receiver.receiveMessages();
should.equal(msgs.length, 1, "Unexpected number of messages received.");
should.equal(msgs[0].body, testMessage.body, "Received message body is different than expected");
should.equal(
msgs[0].body,
testMessage.body,
"Received message body is different than expected"
);
await msgs[0].complete();
});

describe("Batch Receiver - Settle message", function(): void {
Expand Down
Loading