Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Adding in support for stopping a Subscription for session subscribe #9849

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b1b07b6
Adding in support for stopping a Subscription.
richardpark-msft Jun 30, 2020
28c5cb1
There is no 'we'!
richardpark-msft Jun 30, 2020
63704d9
Adding in some more logging for the critical points.
richardpark-msft Jun 30, 2020
fa8e5bc
Merge branch 'richardpark-sb-track2-subscriber-close' of https://gith…
richardpark-msft Jun 30, 2020
fc14de0
Adding a bit more protection when adding credits to the receiver.
richardpark-msft Jun 30, 2020
e7bf7f4
Remove commented line.
richardpark-msft Jun 30, 2020
4093760
Add in Harsha's edit for log message.
richardpark-msft Jun 30, 2020
327d1fd
We can cleanup ALL the instances of using .addCredit()!
richardpark-msft Jun 30, 2020
049f9b5
Merge branch 'richardpark-sb-track2-subscriber-close' of https://gith…
richardpark-msft Jun 30, 2020
02f0a0c
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jun 30, 2020
3818251
Remove comment - I _think_ we're okay with the this._receiver check o…
richardpark-msft Jun 30, 2020
3b008e5
Some new tests were added that don't properly clean up after themselves.
richardpark-msft Jul 1, 2020
4843f51
- Making it so session receivers can also close() a Subscription.
richardpark-msft Jul 1, 2020
39b1d53
Adding in changelog entry
richardpark-msft Jul 1, 2020
e3dd502
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jul 1, 2020
9fcd6de
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jul 2, 2020
e68f4c6
Add returns to the addCredit method to explain what the boolean means.
richardpark-msft Jul 2, 2020
fa4e1fa
Added @returns to the subscribe method to describe what the close() m…
richardpark-msft Jul 2, 2020
2b64f9e
Save some money on the bottom line and eliminate unneeded semi-colon …
richardpark-msft Jul 2, 2020
cd9bb47
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jul 2, 2020
cde1726
Remove duplicate function.
richardpark-msft Jul 2, 2020
259cae7
Merge remote-tracking branch 'upstream/master' into richardpark-sb-tr…
richardpark-msft Jul 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
- Adds abortSignal support throughout Sender and non-session Receivers.
[PR 9233](https://github.com/Azure/azure-sdk-for-js/pull/9233)
[PR 9284](https://github.com/Azure/azure-sdk-for-js/pull/9284)
- (Receiver|SessionReceiver).subscribe() now returns a closeable object which will stop new messages from arriving
but still leave the receiver open so they can be settled via methods like complete().
[PR 9802](https://github.com/Azure/azure-sdk-for-js/pull/9802)
[PR 9849](https://github.com/Azure/azure-sdk-for-js/pull/9849)
- Bug - Messages scheduled in parallel with the `scheduleMessage` method have the same sequence number in response.
Fixed in [PR 9503](https://github.com/Azure/azure-sdk-for-js/pull/9503)
- Management api updates
Expand Down
96 changes: 93 additions & 3 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,24 @@ export class MessageReceiver extends LinkEntity {
private _isDetaching: boolean = false;
private _stopReceivingMessages: boolean = false;

public get receiverHelper(): ReceiverHelper {
return this._receiverHelper;
}
private _receiverHelper: ReceiverHelper;

constructor(context: ClientEntityContext, receiverType: ReceiverType, options?: ReceiveOptions) {
super(context.entityPath, context, {
address: context.entityPath,
audience: `${context.namespace.config.endpoint}${context.entityPath}`
});

if (!options) options = {};
this._retryOptions = options.retryOptions || {};
this.wasCloseInitiated = false;
this.receiverType = receiverType;
this.receiveMode = options.receiveMode || ReceiveMode.peekLock;
this._receiverHelper = new ReceiverHelper(() => this._receiver);

if (typeof options.maxConcurrentCalls === "number" && options.maxConcurrentCalls > 0) {
this.maxConcurrentCalls = options.maxConcurrentCalls;
}
Expand Down Expand Up @@ -515,7 +523,7 @@ export class MessageReceiver extends LinkEntity {
}
return;
} finally {
this.addCredit(1);
this._receiverHelper.addCredit(1);
}

// If we've made it this far, then user's message handler completed fine. Let us try
Expand Down Expand Up @@ -715,7 +723,9 @@ export class MessageReceiver extends LinkEntity {
* Prevents us from receiving any further messages.
*/
public stopReceivingMessages(): Promise<void> {
log.receiver(`[${this.name}] User has requested to stop receiving new messages, attempting to drain the credits.`);
log.receiver(
`[${this.name}] User has requested to stop receiving new messages, attempting to drain the credits.`
);
this._stopReceivingMessages = true;

return this.drainReceiver();
Expand Down Expand Up @@ -1032,7 +1042,7 @@ export class MessageReceiver extends LinkEntity {
await this.close();
} else {
if (this._receiver && this.receiverType === ReceiverType.streaming) {
this.addCredit(this.maxConcurrentCalls);
this._receiverHelper.addCredit(this.maxConcurrentCalls);
}
}
return;
Expand Down Expand Up @@ -1188,3 +1198,83 @@ export class MessageReceiver extends LinkEntity {
return result;
}
}

/**
* Wraps the receiver with some higher level operations for managing state
* like credits, draining, etc...
*
* @internal
* @ignore
*/
export class ReceiverHelper {
private _stopReceivingMessages: boolean = false;

constructor(private _getCurrentReceiver: () => Receiver | undefined) {}

/**
* Adds credits to the receiver, respecting any state that
* indicates the receiver is closed or should not continue
* to receive more messages.
*
* @param credits Number of credits to add.
* @returns true if credits were added, false if there is no current receiver instance
* or `stopReceivingMessages` has been called.
*/
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
public addCredit(credits: number): boolean {
const receiver = this._getCurrentReceiver();

if (this._stopReceivingMessages || receiver == null) {
return false;
}

receiver.addCredit(credits);
return true;
}

/**
* Prevents us from receiving any further messages.
*/
public async stopReceivingMessages(): Promise<void> {
const receiver = this._getCurrentReceiver();

if (receiver == null) {
return;
}

log.receiver(
`[${receiver.name}] User has requested to stop receiving new messages, attempting to drain the credits.`
);
this._stopReceivingMessages = true;

return this.drain();
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Initiates a drain for the current receiver and resolves when
* the drain has completed.
*/
public async drain(): Promise<void> {
const receiver = this._getCurrentReceiver();

if (receiver == null) {
return;
}

log.receiver(`[${receiver.name}] Receiver is starting drain.`);

const drainPromise = new Promise<void>((resolve) => {
receiver.once(ReceiverEvents.receiverDrained, () => {
log.receiver(`[${receiver.name}] Receiver has been drained.`);
receiver.drain = false;
resolve();
});

receiver.drain = true;
// this is not actually adding another credit - it'll just
// cause the drain call to start.
receiver.addCredit(1);
});

return drainPromise;
}
}
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class StreamingReceiver extends MessageReceiver {
throwErrorIfConnectionClosed(this._context.namespace);
this._onMessage = onMessage;
this._onError = onError;
this.addCredit(this.maxConcurrentCalls);
this.receiverHelper.addCredit(this.maxConcurrentCalls);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export interface Receiver<ReceivedMessageT> {
* Streams messages to message handlers.
* @param handlers A handler that gets called for messages and errors.
* @param options Options for subscribe.
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
* @returns An object that can be closed, sending any remaining messages to `handlers` and
* stopping new messages from arriving.
*/
subscribe(
handlers: MessageHandlers<ReceivedMessageT>,
Expand Down Expand Up @@ -403,7 +405,7 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes

return {
close: async (): Promise<void> => {
return this._context.streamingReceiver?.stopReceivingMessages();
return this._context.streamingReceiver?.receiverHelper.stopReceivingMessages();
}
};
}
Expand Down
5 changes: 3 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,9 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
);

return {
// TODO: coming in a future PR.
close: async (): Promise<void> => {}
close: async (): Promise<void> => {
return this._messageSession?.receiverHelper.stopReceivingMessages();
}
};
}

Expand Down
28 changes: 19 additions & 9 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ import {
isAmqpError
} from "rhea-promise";
import * as log from "../log";
import { OnAmqpEventAsPromise, OnError, OnMessage, PromiseLike } from "../core/messageReceiver";
import {
OnAmqpEventAsPromise,
OnError,
OnMessage,
PromiseLike,
ReceiverHelper
} from "../core/messageReceiver";
import { LinkEntity } from "../core/linkEntity";
import { ClientEntityContext } from "../clientEntityContext";
import { calculateRenewAfterDuration, convertTicksToDate } from "../util/utils";
Expand Down Expand Up @@ -250,6 +256,11 @@ export class MessageSession extends LinkEntity {

private _totalAutoLockRenewDuration: number;

public get receiverHelper(): ReceiverHelper {
return this._receiverHelper;
}
private _receiverHelper: ReceiverHelper;

/**
* Ensures that the session lock is renewed before it expires. The lock will not be renewed for
* more than the configured totalAutoLockRenewDuration.
Expand Down Expand Up @@ -364,15 +375,15 @@ export class MessageSession extends LinkEntity {
// SB allows a sessionId with empty string value :)

if (this.sessionId == null && receivedSessionId == null) {
// Ideally this code path should never be reached as `createReceiver()` should fail instead
// Ideally this code path should never be reached as `createReceiver()` should fail instead
// TODO: https://github.com/Azure/azure-sdk-for-js/issues/9775 to figure out why this code path indeed gets hit.
errorMessage = `No unlocked sessions were available`;
} else if (this.sessionId != null && receivedSessionId !== this.sessionId) {
// This code path is reached if the session is already locked by another receiver.
// TODO: Check why the service would not throw an error or just timeout instead of giving a misleading successful receiver
errorMessage = `Failed to get a lock on the session ${this.sessionId};`
errorMessage = `Failed to get a lock on the session ${this.sessionId}`;
}

if (errorMessage) {
const error = translate({
description: errorMessage,
Expand Down Expand Up @@ -476,6 +487,7 @@ export class MessageSession extends LinkEntity {
});
this._context.isSessionEnabled = true;
this.isReceivingMessages = false;
this._receiverHelper = new ReceiverHelper(() => this._receiver);
if (!options) options = { sessionId: undefined };
this.autoComplete = false;
this.sessionId = options.sessionId;
Expand Down Expand Up @@ -846,9 +858,7 @@ export class MessageSession extends LinkEntity {
}
return;
} finally {
if (this._receiver) {
this._receiver!.addCredit(1);
}
this._receiverHelper.addCredit(1);
}

// If we've made it this far, then user's message handler completed fine. Let us try
Expand Down Expand Up @@ -883,7 +893,7 @@ export class MessageSession extends LinkEntity {
// setting the "message" event listener.
this._receiver.on(ReceiverEvents.message, onSessionMessage);
// adding credit
this._receiver!.addCredit(this.maxConcurrentCalls);
this._receiverHelper.addCredit(this.maxConcurrentCalls);
} else {
this.isReceivingMessages = false;
const msg =
Expand Down Expand Up @@ -1077,7 +1087,7 @@ export class MessageSession extends LinkEntity {
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// (complete,/abandon/defer/deadletter) the messages from the array.
this._receiver!.addCredit(maxMessageCount);
this._receiverHelper.addCredit(maxMessageCount);
let msg: string = "[%s] Setting the wait timer for %d milliseconds for receiver '%s'.";
if (reuse) msg += " Receiver link already present, hence reusing it.";
log.batching(msg, this._context.namespace.connectionId, maxWaitTimeInMs, this.name);
Expand Down
58 changes: 29 additions & 29 deletions sdk/servicebus/service-bus/test/streamingReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1158,35 +1158,6 @@ describe("Streaming", () => {
});
});

function singleMessagePromise(
receiver: Receiver<ReceivedMessageWithLock>
): Promise<{
subscriber: ReturnType<Receiver<unknown>["subscribe"]>;
messages: ReceivedMessageWithLock[];
}> {
const messages: ReceivedMessageWithLock[] = [];

return new Promise<{
subscriber: ReturnType<Receiver<unknown>["subscribe"]>;
messages: ReceivedMessageWithLock[];
}>((resolve, reject) => {
const subscriber = receiver.subscribe(
{
processMessage: async (message) => {
messages.push(message);
resolve({ subscriber, messages });
},
processError: async (err) => {
reject(err);
}
},
{
autoComplete: false
}
);
});
}

describe("Streaming - onDetached", function(): void {
let serviceBusClient: ServiceBusClientForTests;
let sender: Sender;
Expand Down Expand Up @@ -1476,3 +1447,32 @@ describe("Streaming - disconnects", function(): void {
refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called.");
});
});

export function singleMessagePromise(
receiver: Receiver<ReceivedMessageWithLock>
): Promise<{
subscriber: ReturnType<Receiver<unknown>["subscribe"]>;
messages: ReceivedMessageWithLock[];
}> {
const messages: ReceivedMessageWithLock[] = [];

return new Promise<{
subscriber: ReturnType<Receiver<unknown>["subscribe"]>;
messages: ReceivedMessageWithLock[];
}>((resolve, reject) => {
const subscriber = receiver.subscribe(
{
processMessage: async (message) => {
messages.push(message);
resolve({ subscriber, messages });
},
processError: async (err) => {
reject(err);
}
},
{
autoComplete: false
}
);
});
}
Loading