Skip to content

Commit

Permalink
[Service Bus] Update Delays in Streaming Receiver (Azure#1214)
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshaNalluru authored and ramya-rao-a committed Feb 15, 2019
1 parent f24e10d commit 07d7580
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
TopicClient,
SubscriptionClient,
ServiceBusMessage,
delay,
SendableMessageInfo,
ReceiveMode
} from "../lib";
Expand All @@ -26,7 +25,8 @@ import {
testSessionId1,
getSenderReceiverClients,
ClientType,
purge
purge,
checkWithTimeout
} from "./testUtils";

import { Receiver, SessionReceiver } from "../lib/receiver";
Expand Down Expand Up @@ -219,7 +219,8 @@ describe("Streaming Receiver from Queue/Subscription", function(): void {
{ autoComplete: autoCompleteFlag }
);

await delay(2000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
should.equal(receivedMsgs[0].body, testMessages.body, "MessageBody is different than expected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import {
getSenderReceiverClients,
ClientType,
testSessionId1,
purge
purge,
checkWithTimeout
} from "./testUtils";

async function testPeekMsgsLength(
Expand Down Expand Up @@ -198,8 +199,10 @@ describe("SessionTests - Accept a session by passing non-existing sessionId rece
return Promise.resolve();
}, unExpectedErrorHandler);

await delay(2000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

await testPeekMsgsLength(receiverClient, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ import {

import { DispositionType } from "../lib/serviceBusMessage";

import { testSimpleMessages, getSenderReceiverClients, ClientType, purge } from "./testUtils";
import {
testSimpleMessages,
getSenderReceiverClients,
ClientType,
purge,
checkWithTimeout
} from "./testUtils";
import { Receiver } from "../lib/receiver";
import { Sender } from "../lib/sender";

Expand Down Expand Up @@ -123,13 +129,9 @@ describe("Streaming Receiver - Misc Tests", function(): void {
return Promise.resolve();
}, unExpectedErrorHandler);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);

should.equal(msgsCheck, true, "Could not receive the messages in expected time.");
await receiver.close();

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
Expand Down Expand Up @@ -180,14 +182,11 @@ describe("Streaming Receiver - Misc Tests", function(): void {
{ autoComplete: false }
);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);

should.equal(msgsCheck, true, "Could not receive the messages in expected time.");
await testPeekMsgsLength(receiverClient, 1);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");

await receivedMsgs[0].complete();
await receiver.close();
Expand Down Expand Up @@ -248,16 +247,12 @@ describe("Streaming Receiver - Complete message", function(): void {
{ autoComplete }
);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

await receiver.close();
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
await testPeekMsgsLength(receiverClient, 0);
}
it("Partitioned Queue: complete() removes message", async function(): Promise<void> {
Expand Down Expand Up @@ -333,13 +328,12 @@ describe("Streaming Receiver - Abandon message", function(): void {
{ autoComplete: false }
);

await delay(6000);
const deliveryCountFlag = await checkWithTimeout(() => checkDeliveryCount === maxDeliveryCount);
should.equal(deliveryCountFlag, true, "DeliveryCount is different than expected");

await receiver.close();
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

should.equal(checkDeliveryCount, maxDeliveryCount, "DeliveryCount is different than expected");

await testPeekMsgsLength(receiverClient, 0); // No messages in the queue

const deadLetterMsgs = await deadLetterClient.getReceiver().receiveBatch(1);
Expand Down Expand Up @@ -406,7 +400,14 @@ describe("Streaming Receiver - Defer message", function(): void {
unExpectedErrorHandler,
{ autoComplete }
);
await delay(4000);

const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0);
should.equal(
sequenceNumCheck,
true,
"Either the message is not received or observed an unexpected SequenceNumber."
);

await receiver.close();
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

Expand All @@ -429,7 +430,6 @@ describe("Streaming Receiver - Defer message", function(): void {
should.equal(deferredMsgs[0].deliveryCount, 1, "DeliveryCount is different than expected");

await deferredMsgs[0].complete();
await delay(10000);
await testPeekMsgsLength(receiverClient, 0);
}

Expand Down Expand Up @@ -496,17 +496,22 @@ describe("Streaming Receiver - Deadletter message", function(): void {
async function testDeadletter(autoComplete: boolean): Promise<void> {
await sender.send(testSimpleMessages);

const receivedMsgs: ServiceBusMessage[] = [];
receiver.receive(
(msg: ServiceBusMessage) => {
receivedMsgs.push(msg);
return msg.deadLetter();
},
unExpectedErrorHandler,
{ autoComplete }
);

await delay(4000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

await receiver.close();
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");

await testPeekMsgsLength(receiverClient, 0);

Expand All @@ -520,7 +525,6 @@ describe("Streaming Receiver - Deadletter message", function(): void {
);

await deadLetterMsgs[0].complete();

await testPeekMsgsLength(deadLetterClient, 0);
}

Expand Down Expand Up @@ -664,7 +668,9 @@ describe("Streaming Receiver - Settle an already Settled message throws error",
return Promise.resolve();
}, unExpectedErrorHandler);

await delay(5000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import {
testSessionId1,
getSenderReceiverClients,
ClientType,
purge
purge,
checkWithTimeout
} from "./testUtils";
import { Sender } from "../lib/sender";
import { SessionReceiver } from "../lib/receiver";
Expand Down Expand Up @@ -129,12 +130,9 @@ describe("Streaming Receiver - Misc Tests(with sessions)", function(): void {
return Promise.resolve();
}, unExpectedErrorHandler);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
await testPeekMsgsLength(receiverClient, 0);
Expand Down Expand Up @@ -203,18 +201,15 @@ describe("Streaming Receiver - Misc Tests(with sessions)", function(): void {
{ autoComplete: false }
);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

await testPeekMsgsLength(receiverClient, 1);

await receivedMsgs[0].complete();

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
}

it("Disabled autoComplete, no manual complete retains the message in Partitioned Queue(with sessions)", async function(): Promise<
Expand Down Expand Up @@ -286,14 +281,11 @@ describe("Streaming Receiver - Complete message(with sessions)", function(): voi
{ autoComplete }
);

for (let i = 0; i < 5; i++) {
await delay(1000);
if (receivedMsgs.length === 1) {
break;
}
}
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(receivedMsgs.length, 1, "Unexpected number of messages");

await testPeekMsgsLength(receiverClient, 0);
}
Expand Down Expand Up @@ -385,10 +377,11 @@ describe("Streaming Receiver - Abandon message(with sessions)", function(): void

async function testAbandon(autoComplete: boolean): Promise<void> {
await sender.send(testMessagesWithSessions);

let abandonFlag = 0;
await sessionReceiver.receive(
(msg: ServiceBusMessage) => {
return msg.abandon().then(() => {
abandonFlag = 1;
if (sessionReceiver.isOpen()) {
return sessionReceiver.close();
}
Expand All @@ -398,7 +391,9 @@ describe("Streaming Receiver - Abandon message(with sessions)", function(): void
unExpectedErrorHandler,
{ autoComplete }
);
await delay(4000);

const msgAbandonCheck = await checkWithTimeout(() => abandonFlag === 1);
should.equal(msgAbandonCheck, true, "Abandoning the message results in a failure");

if (sessionReceiver.isOpen()) {
await sessionReceiver.close();
Expand Down Expand Up @@ -518,7 +513,12 @@ describe("Streaming Receiver - Defer message(with sessions)", function(): void {
{ autoComplete }
);

await delay(4000);
const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0);
should.equal(
sequenceNumCheck,
true,
"Either the message is not received or observed an unexpected SequenceNumber."
);

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

Expand All @@ -540,7 +540,6 @@ describe("Streaming Receiver - Defer message(with sessions)", function(): void {
should.equal(deferredMsg.deliveryCount, 1, "DeliveryCount is different than expected");

await deferredMsg.complete();

await testPeekMsgsLength(receiverClient, 0);
}
it("Partitioned Queue: defer() moves message to deferred queue(with sessions)", async function(): Promise<
Expand Down Expand Up @@ -632,17 +631,21 @@ describe("Streaming Receiver - Deadletter message(with sessions)", function(): v
async function testDeadletter(autoComplete: boolean): Promise<void> {
await sender.send(testMessagesWithSessions);

let msgCount = 0;
await sessionReceiver.receive(
(msg: ServiceBusMessage) => {
msgCount++;
return msg.deadLetter();
},
unExpectedErrorHandler,
{ autoComplete }
);

await delay(4000);
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
const msgsCheck = await checkWithTimeout(() => msgCount === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");

should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);
should.equal(msgCount, 1, "Unexpected number of messages");
await testPeekMsgsLength(receiverClient, 0);

const deadLetterMsgs = await deadLetterClient.getReceiver().receiveBatch(1);
Expand Down Expand Up @@ -833,7 +836,8 @@ describe("Streaming Receiver - Settle an already Settled message throws error(wi
return Promise.resolve();
}, unExpectedErrorHandler);

await delay(5000);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, "Could not receive the messages in expected time.");
should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message);

should.equal(receivedMsgs.length, 1, "Unexpected number of messages");
Expand Down
20 changes: 19 additions & 1 deletion packages/@azure/servicebus/data-plane/test/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import {
QueueClient,
TopicClient,
Namespace,
SubscriptionClient
SubscriptionClient,
delay
} from "../lib";
import * as msRestNodeAuth from "@azure/ms-rest-nodeauth";
import { ServiceBusManagementClient } from "@azure/arm-servicebus";
Expand Down Expand Up @@ -383,3 +384,20 @@ export async function purge(
}
}
}

/**
* Maximum wait duration for the expected event to happen = `10000 ms`(default value is 10 seconds)(= maxWaitTimeInMilliseconds)
* Keep checking whether the predicate is true after every `1000 ms`(default value is 1 second) (= delayBetweenRetriesInMilliseconds)
*/
export async function checkWithTimeout(
predicate: () => boolean,
delayBetweenRetriesInMilliseconds: number = 1000,
maxWaitTimeInMilliseconds: number = 10000
): Promise<boolean> {
const maxTime = Date.now() + maxWaitTimeInMilliseconds;
while (Date.now() < maxTime) {
if (predicate()) return true;
await delay(delayBetweenRetriesInMilliseconds);
}
return false;
}
Loading

0 comments on commit 07d7580

Please sign in to comment.