diff --git a/sdk/servicebus/service-bus/README.md b/sdk/servicebus/service-bus/README.md index 4695fcbd8115..46d039ad2a65 100644 --- a/sdk/servicebus/service-bus/README.md +++ b/sdk/servicebus/service-bus/README.md @@ -106,12 +106,12 @@ This gives you a sender which you can use to [send][sender_send] messages. const sender = serviceBusClient.createSender("my-queue"); // sending a single message -await sender.send({ +await sender.sendMessages({ body: "my-message-body" }); // sending multiple messages -await sender.send([ +await sender.sendMessages([ { body: "my-message-body" }, @@ -124,7 +124,7 @@ await sender.send([ ### Receive messages Once you have created an instance of a `ServiceBusClient` class, you can get a `Receiver` -using the [createReceiver][sbclient_createreceiver] function. +using the [createReceiver][sbclient_createreceiver] method. ```javascript const receiver = serviceBusClient.createReceiver("my-queue", "peekLock"); @@ -134,11 +134,11 @@ You can use this receiver in one of 3 ways to receive messages: #### Get an array of messages -Use the [receiveBatch][receiverreceivebatch] function which returns a promise that +Use the [receiveMessages][receiverreceivebatch] function which returns a promise that resolves to an array of messages. ```javascript -const myMessages = await receiver.receiveBatch(10); +const myMessages = await receiver.receiveMessages(10); ``` #### Subscribe using a message handler @@ -191,7 +191,7 @@ your message lands in the right session. ```javascript const sender = serviceBusClient.createSender("my-session-queue"); -await sender.send({ +await sender.sendMessages({ body: "my-message-body", sessionId: "my-session" }); diff --git a/sdk/servicebus/service-bus/migrationguide.md b/sdk/servicebus/service-bus/migrationguide.md index 4fcc0ce4d9fe..b06dabaca6ff 100644 --- a/sdk/servicebus/service-bus/migrationguide.md +++ b/sdk/servicebus/service-bus/migrationguide.md @@ -12,6 +12,8 @@ brings this package in line with the [Azure SDK Design Guidelines for Typescript ## API changes from V1 to V7 +### Creating ServiceBusClient + - `ServiceBusClient` can now be constructed using new(). The static methods to construct it have been removed. @@ -27,6 +29,8 @@ brings this package in line with the [Azure SDK Design Guidelines for Typescript const serviceBusClient = new ServiceBusClient("connection string"); ``` +### Creating senders and receivers + - `QueueClient`, `TopicClient` and `SubscriptionClient` have been replaced with methods to create receivers and senders directly from `ServiceBusClient`. @@ -67,6 +71,20 @@ brings this package in line with the [Azure SDK Design Guidelines for Typescript const subscriptionReceiver = serviceBusClient.createReceiver("topic", "subscription", "peekLock"); ``` +- `createSessionReceiver()` is now an async method. + - The promise returned by this method is resolved when a receiver link has been initialized with a session in the service. + - Prior to v7 `createSessionReceiver()` worked using lazy-initialization, where the +receiver link to the session was only initialized when the async methods on the `SessionReceiver` +were first called. + +### Receiving messages + +* `peek()` and `peekBySequenceNumber()` methods are collapsed into a single method `peekMessages()`. +The options passed to this new method accomodates both number of messages to be peeked and the sequence number to peek from. + +* `receiveBatch()` method is renamed to `receiveMessages()` to be consistent in usage of the `Messages` suffix in other methods +on the receiver and the sender. + * `registerMessageHandler` on `Receiver` has been renamed to `subscribe` and takes different arguments. In V1: @@ -87,32 +105,28 @@ brings this package in line with the [Azure SDK Design Guidelines for Typescript }); ``` -* `peekBySequenceNumber()`is removed in favor of an overload to `peekMessages()` that would take the sequence number to start peeking from in the options. +### Rule management -* Subscription rule management has been moved to its own class, rather than being part of the now-removed `SubscriptionClient` +* The add/get/remove rule operations on the older `SubscriptionClient` have moved to the new `ServiceBusManagementClient` class which will be supporting +Create, Get, Update and Delete operations on Queues, Topics, Subscriptions and Rules. In V1: ```typescript - subscriptionClient.addRule(); - subscriptionClient.getRules(); - subscriptionClient.removeRule(); + await subscriptionClient.addRule(); + await subscriptionClient.getRules(); + await subscriptionClient.removeRule(); ``` In V7: ```typescript - const ruleManager = serviceBusClient.getSubscriptionRuleManager("topic", "subscription"); - ruleManager.addRule(); - ruleManager.getRules(); - ruleManager.removeRule(); + const serviceBusManagementClient = new ServiceBusManagementClient(connectionString); + await serviceBusManagementClient.createRule(); + await serviceBusManagementClient.getRules(); + await serviceBusManagementClient.deleteRule(); ``` -* createSessionReceiver() is now an async method. The promise returned by this method - is resolved when a receiver link has been initialized with a session in the service. -Prior to v7 `createSessionReceiver()` worked using lazy-initialization, where the -receiver link to the session was only initialized when the async methods on the `SessionReceiver` -were first called. ![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-js%2Fsdk%2Fservicebus%2Fservice-bus%2FMIGRATIONGUIDE.png) diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index dea722fbe1b7..6a3eab6f9561 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -175,10 +175,6 @@ export interface QueuesResponse extends Array, Response { export interface QueuesRuntimeInfoResponse extends Array, Response { } -// @public -export interface ReceiveBatchOptions extends OperationOptions, WaitTimeOptions { -} - // @public export interface ReceivedMessage extends ServiceBusMessage { readonly _amqpMessage: AmqpMessage; @@ -207,6 +203,10 @@ export interface ReceivedMessageWithLock extends ReceivedMessage { renewLock(): Promise; } +// @public +export interface ReceiveMessagesOptions extends OperationOptions, WaitTimeOptions { +} + // @public export interface Receiver { close(): Promise; @@ -215,9 +215,8 @@ export interface Receiver { isClosed: boolean; isReceivingMessages(): boolean; peekMessages(options?: PeekMessagesOptions): Promise; - receiveBatch(maxMessages: number, options?: ReceiveBatchOptions): Promise; - receiveDeferredMessage(sequenceNumber: Long, options?: OperationOptions): Promise; - receiveDeferredMessages(sequenceNumbers: Long[], options?: OperationOptions): Promise; + receiveDeferredMessages(sequenceNumbers: Long | Long[], options?: OperationOptions): Promise; + receiveMessages(maxMessages: number, options?: ReceiveMessagesOptions): Promise; receiveMode: "peekLock" | "receiveAndDelete"; subscribe(handlers: MessageHandlers, options?: SubscribeOptions): void; } @@ -246,18 +245,14 @@ export interface RulesResponse extends Array, Response { // @public export interface Sender { - cancelScheduledMessage(sequenceNumber: Long, options?: OperationOptions): Promise; - cancelScheduledMessages(sequenceNumbers: Long[], options?: OperationOptions): Promise; + cancelScheduledMessages(sequenceNumbers: Long | Long[], options?: OperationOptions): Promise; close(): Promise; createBatch(options?: CreateBatchOptions): Promise; entityPath: string; isClosed: boolean; open(options?: SenderOpenOptions): Promise; - scheduleMessage(scheduledEnqueueTimeUtc: Date, message: ServiceBusMessage, options?: OperationOptions): Promise; - scheduleMessages(scheduledEnqueueTimeUtc: Date, messages: ServiceBusMessage[], options?: OperationOptions): Promise; - send(message: ServiceBusMessage, options?: OperationOptions): Promise; - send(messages: ServiceBusMessage[], options?: OperationOptions): Promise; - send(messageBatch: ServiceBusMessageBatch, options?: OperationOptions): Promise; + scheduleMessages(scheduledEnqueueTimeUtc: Date, messages: ServiceBusMessage | ServiceBusMessage[], options?: OperationOptions): Promise; + sendMessages(messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch, options?: OperationOptions): Promise; } // @public diff --git a/sdk/servicebus/service-bus/samples-v1/typescript/src/scheduledMessages.ts b/sdk/servicebus/service-bus/samples-v1/typescript/src/scheduledMessages.ts index 3da9cc6f7f86..eaff179b6c28 100644 --- a/sdk/servicebus/service-bus/samples-v1/typescript/src/scheduledMessages.ts +++ b/sdk/servicebus/service-bus/samples-v1/typescript/src/scheduledMessages.ts @@ -2,7 +2,7 @@ Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT Licence. - This sample demonstrates how the scheduleMessage() function can be used to schedule messages to + This sample demonstrates how the scheduleMessages() function can be used to schedule messages to appear on a Service Bus Queue/Subscription at a later time. See https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sequencing#scheduled-messages diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js b/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js index 668b4c1ed7c2..41a08215c001 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js @@ -52,7 +52,7 @@ async function sendMessages() { promises.push( delay(Math.random() * 30).then(async () => { try { - await sender.send(message); + await sender.sendMessages(message); console.log("Sent message step:", data[index].step); } catch (err) { console.log("Error while sending message", err); @@ -121,7 +121,7 @@ async function receiveMessage() { while (deferredSteps.size > 0) { const step = lastProcessedRecipeStep + 1; const sequenceNumber = deferredSteps.get(step); - const message = await receiver.receiveDeferredMessage(sequenceNumber); + const [message] = await receiver.receiveDeferredMessages(sequenceNumber); if (message) { console.log("Process deferred message:", message.body); await message.complete(); diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/movingMessagesToDLQ.js b/sdk/servicebus/service-bus/samples/javascript/advanced/movingMessagesToDLQ.js index ca138fe02b17..cba6eff47262 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/movingMessagesToDLQ.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/movingMessagesToDLQ.js @@ -45,7 +45,7 @@ async function sendMessage() { contentType: "application/json", label: "Recipe" }; - await sender.send(message); + await sender.sendMessages(message); await sender.close(); } @@ -53,7 +53,7 @@ async function receiveMessage() { // If receiving from a subscription you can use the createReceiver(topic, subscription) overload const receiver = sbClient.createReceiver(queueName, "peekLock"); - const messages = await receiver.receiveBatch(1); + const messages = await receiver.receiveMessages(1); if (messages.length) { console.log( diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/processMessageFromDLQ.js b/sdk/servicebus/service-bus/samples/javascript/advanced/processMessageFromDLQ.js index a624f30a0686..b4854765d5be 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/processMessageFromDLQ.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/processMessageFromDLQ.js @@ -35,7 +35,7 @@ async function processDeadletterMessageQueue() { // If connecting to a subscription's dead letter queue you can use the createDeadLetterReceiver(topic, subscription) overload const receiver = sbClient.createDeadLetterReceiver(queueName, "peekLock"); - const messages = await receiver.receiveBatch(1); + const messages = await receiver.receiveMessages(1); if (messages.length > 0) { console.log(">>>>> Received the message from DLQ - ", messages[0].body); @@ -62,7 +62,7 @@ async function fixAndResendMessage(oldMessage) { console.log(">>>>> Cloning the message from DLQ and resending it - ", oldMessage.body); - await sender.send(repairedMessage); + await sender.sendMessages(repairedMessage); await sender.close(); } diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/sessionState.js b/sdk/servicebus/service-bus/samples/javascript/advanced/sessionState.js index daeeb72fdea9..4312236fced7 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/sessionState.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/sessionState.js @@ -93,7 +93,7 @@ async function sendMessagesForSession(shoppingEvents, sessionId) { body: shoppingEvents[index], label: "Shopping Step" }; - await sender.send(message); + await sender.sendMessages(message); } await sender.close(); } @@ -103,7 +103,7 @@ async function processMessageFromSession(sessionId) { sessionId }); - const messages = await sessionReceiver.receiveBatch(1, { + const messages = await sessionReceiver.receiveMessages(1, { maxWaitTimeSeconds: 10 }); // Custom logic for processing the messages diff --git a/sdk/servicebus/service-bus/samples/javascript/receiveMessagesLoop.js b/sdk/servicebus/service-bus/samples/javascript/receiveMessagesLoop.js index 03d869040465..e6b2b53ebf93 100644 --- a/sdk/servicebus/service-bus/samples/javascript/receiveMessagesLoop.js +++ b/sdk/servicebus/service-bus/samples/javascript/receiveMessagesLoop.js @@ -5,7 +5,7 @@ **NOTE**: If you are using version 1.1.x or lower, then please use the link below: https://github.com/Azure/azure-sdk-for-js/tree/%40azure/service-bus_1.1.5/sdk/servicebus/service-bus/samples - This sample demonstrates how the receiveBatch() function can be used to receive Service Bus + This sample demonstrates how the receiveMessages() function can be used to receive Service Bus messages in a loop. Setup: Please run "sendMessages.ts" sample before running this to populate the queue/topic @@ -27,7 +27,7 @@ async function main() { try { for (let i = 0; i < 10; i++) { - const messages = await queueReceiver.receiveBatch(1, { + const messages = await queueReceiver.receiveMessages(1, { maxWaitTimeSeconds: 5 }); if (!messages.length) { diff --git a/sdk/servicebus/service-bus/samples/javascript/sendMessages.js b/sdk/servicebus/service-bus/samples/javascript/sendMessages.js index 57a6a3d1c660..8f60978298da 100644 --- a/sdk/servicebus/service-bus/samples/javascript/sendMessages.js +++ b/sdk/servicebus/service-bus/samples/javascript/sendMessages.js @@ -5,7 +5,7 @@ **NOTE**: If you are using version 1.1.x or lower, then please use the link below: https://github.com/Azure/azure-sdk-for-js/tree/%40azure/service-bus_1.1.5/sdk/servicebus/service-bus/samples - This sample demonstrates how the send() function can be used to send messages to Service Bus + This sample demonstrates how the sendMessages() method can be used to send messages to Service Bus Queue/Topic. See https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions @@ -48,7 +48,7 @@ async function main() { }; console.log(`Sending message: ${message.body} - ${message.label}`); - await sender.send(message); + await sender.sendMessages(message); } await sender.close(); diff --git a/sdk/servicebus/service-bus/samples/javascript/session.js b/sdk/servicebus/service-bus/samples/javascript/session.js index 0aca4d18448a..9334c9506a12 100644 --- a/sdk/servicebus/service-bus/samples/javascript/session.js +++ b/sdk/servicebus/service-bus/samples/javascript/session.js @@ -69,7 +69,7 @@ async function sendMessage(sbClient, scientist, sessionId) { }; console.log(`Sending message: "${message.body}" to "${sessionId}"`); - await sender.send(message); + await sender.sendMessages(message); await sender.close(); } diff --git a/sdk/servicebus/service-bus/samples/javascript/useProxy.js b/sdk/servicebus/service-bus/samples/javascript/useProxy.js index a96eb18df767..7a97b807313c 100644 --- a/sdk/servicebus/service-bus/samples/javascript/useProxy.js +++ b/sdk/servicebus/service-bus/samples/javascript/useProxy.js @@ -45,7 +45,7 @@ async function main() { console.log(`Sending message using proxy server ${proxyInfo}`); - await sender.send({ + await sender.sendMessages({ body: "sample message" }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts index c13d74e6f80b..46d18722354d 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts @@ -54,7 +54,7 @@ async function sendMessages() { promises.push( delay(Math.random() * 30).then(async () => { try { - await sender.send(message); + await sender.sendMessages(message); console.log("Sent message step:", data[index].step); } catch (err) { console.log("Error while sending message", err); @@ -124,7 +124,7 @@ async function receiveMessage() { while (deferredSteps.size > 0) { const step = lastProcessedRecipeStep + 1; const sequenceNumber = deferredSteps.get(step); - const message = await receiver.receiveDeferredMessage(sequenceNumber); + const [message] = await receiver.receiveDeferredMessages(sequenceNumber); if (message) { console.log("Process deferred message:", message.body); await message.complete(); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/movingMessagesToDLQ.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/movingMessagesToDLQ.ts index f3b8d34b60ab..4ddac58b8430 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/movingMessagesToDLQ.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/movingMessagesToDLQ.ts @@ -47,7 +47,7 @@ async function sendMessage() { contentType: "application/json", label: "Recipe" }; - await sender.send(message); + await sender.sendMessages(message); await sender.close(); } @@ -55,7 +55,7 @@ async function receiveMessage() { // If receiving from a subscription you can use the createReceiver(topic, subscription) overload const receiver = sbClient.createReceiver(queueName, "peekLock"); - const messages = await receiver.receiveBatch(1); + const messages = await receiver.receiveMessages(1); if (messages.length) { console.log( diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/processMessageFromDLQ.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/processMessageFromDLQ.ts index a92111ab7826..3d6417e2c64b 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/processMessageFromDLQ.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/processMessageFromDLQ.ts @@ -37,7 +37,7 @@ async function processDeadletterMessageQueue() { // If connecting to a subscription's dead letter queue you can use the createDeadLetterReceiver(topic, subscription) overload const receiver = sbClient.createDeadLetterReceiver(queueName, "peekLock"); - const messages = await receiver.receiveBatch(1); + const messages = await receiver.receiveMessages(1); if (messages.length > 0) { console.log(">>>>> Received the message from DLQ - ", messages[0].body); @@ -64,7 +64,7 @@ async function fixAndResendMessage(oldMessage: ServiceBusMessage) { console.log(">>>>> Cloning the message from DLQ and resending it - ", oldMessage.body); - await sender.send(repairedMessage); + await sender.sendMessages(repairedMessage); await sender.close(); } diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/sessionState.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/sessionState.ts index fe1dbe0b3240..d229b5281129 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/sessionState.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/sessionState.ts @@ -109,7 +109,7 @@ async function sendMessagesForSession(shoppingEvents: any[], sessionId: string) body: shoppingEvents[index], label: "Shopping Step" }; - await sender.send(message); + await sender.sendMessages(message); } await sender.close(); } @@ -120,7 +120,7 @@ async function processMessageFromSession(sessionId: string) { sessionId }); - const messages = await sessionReceiver.receiveBatch(1, { + const messages = await sessionReceiver.receiveMessages(1, { maxWaitTimeInMs: 10000 }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/receiveMessagesLoop.ts b/sdk/servicebus/service-bus/samples/typescript/src/receiveMessagesLoop.ts index f6548bb73efa..4d2714caeb84 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/receiveMessagesLoop.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/receiveMessagesLoop.ts @@ -6,7 +6,7 @@ For samples using the current stable version of the package, please use the link below: https://github.com/Azure/azure-sdk-for-js/tree/%40azure/service-bus_1.1.5/sdk/servicebus/service-bus/samples - This sample demonstrates how the receiveBatch() function can be used to receive Service Bus + This sample demonstrates how the receiveMessages() function can be used to receive Service Bus messages in a loop. Setup: Please run "sendMessages.ts" sample before running this to populate the queue/topic @@ -33,7 +33,7 @@ export async function main() { // the sample in sessions.ts file try { for (let i = 0; i < 10; i++) { - const messages = await queueReceiver.receiveBatch(1, { + const messages = await queueReceiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/scheduledMessages.ts b/sdk/servicebus/service-bus/samples/typescript/src/scheduledMessages.ts index 5d4774a34e61..3ca3248deb8c 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/scheduledMessages.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/scheduledMessages.ts @@ -6,7 +6,7 @@ For samples using the current stable version of the package, please use the link below: https://github.com/Azure/azure-sdk-for-js/tree/%40azure/service-bus_1.1.5/sdk/servicebus/service-bus/samples - This sample demonstrates how the scheduleMessage() function can be used to schedule messages to + This sample demonstrates how the scheduleMessages() function can be used to schedule messages to appear on a Service Bus Queue/Subscription at a later time. See https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sequencing#scheduled-messages diff --git a/sdk/servicebus/service-bus/samples/typescript/src/sendMessages.ts b/sdk/servicebus/service-bus/samples/typescript/src/sendMessages.ts index 85cf53751096..e87348dabc98 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/sendMessages.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/sendMessages.ts @@ -6,7 +6,7 @@ For samples using the current stable version of the package, please use the link below: https://github.com/Azure/azure-sdk-for-js/tree/%40azure/service-bus_1.1.5/sdk/servicebus/service-bus/samples - This sample demonstrates how the send() function can be used to send messages to Service Bus + This sample demonstrates how the sendMessages() method can be used to send messages to Service Bus Queue/Topic. See https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions @@ -51,7 +51,7 @@ export async function main() { }; console.log(`Sending message: ${message.body} - ${message.label}`); - await sender.send(message); + await sender.sendMessages(message); } await sender.close(); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/session.ts b/sdk/servicebus/service-bus/samples/typescript/src/session.ts index 6e79e20971d2..ae7514acc3ce 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/session.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/session.ts @@ -73,7 +73,7 @@ async function sendMessage(sbClient: ServiceBusClient, scientist: any, sessionId }; console.log(`Sending message: "${message.body}" to "${sessionId}"`); - await sender.send(message); + await sender.sendMessages(message); await sender.close(); } diff --git a/sdk/servicebus/service-bus/samples/typescript/src/useProxy.ts b/sdk/servicebus/service-bus/samples/typescript/src/useProxy.ts index 0f4bdf12c006..7feb1bd1fe1c 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/useProxy.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/useProxy.ts @@ -48,7 +48,7 @@ export async function main() { console.log(`Sending message using proxy server ${proxyInfo}`); - await sender.send({ + await sender.sendMessages({ body: "sample message" }); diff --git a/sdk/servicebus/service-bus/src/index.ts b/sdk/servicebus/service-bus/src/index.ts index a958a1c8150f..f2d469e79cc7 100644 --- a/sdk/servicebus/service-bus/src/index.ts +++ b/sdk/servicebus/service-bus/src/index.ts @@ -22,7 +22,7 @@ export { MessageHandlerOptions, MessageHandlers, PeekMessagesOptions, - ReceiveBatchOptions, + ReceiveMessagesOptions, SenderOpenOptions, SubscribeOptions, WaitTimeOptions diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index 0e0e2d2442e4..2b2fbdf238b0 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -56,7 +56,7 @@ export interface CreateBatchOptions extends OperationOptions { /** * Options when receiving a batch of messages from Service Bus. */ -export interface ReceiveBatchOptions extends OperationOptions, WaitTimeOptions {} +export interface ReceiveMessagesOptions extends OperationOptions, WaitTimeOptions {} /** * Options when getting an iterable iterator from Service Bus. diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index e263b94f628f..3a13aa47c805 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -5,7 +5,7 @@ import { PeekMessagesOptions, GetMessageIteratorOptions, MessageHandlers, - ReceiveBatchOptions, + ReceiveMessagesOptions, SubscribeOptions } from "../models"; import { OperationOptions } from "../modelsToBeSharedWithEventHubs"; @@ -16,8 +16,7 @@ import { getReceiverClosedErrorMsg, throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing, - throwTypeErrorIfParameterNotLong, - throwTypeErrorIfParameterNotLongArray + throwTypeErrorIfParameterNotLong } from "../util/errors"; import * as log from "../log"; import { OnError, OnMessage, ReceiveOptions } from "../core/messageReceiver"; @@ -50,28 +49,13 @@ export interface Receiver { /** * Receives, at most, `maxMessages` worth of messages. * @param maxMessages The maximum number of messages to accept. - * @param options Options for receiveBatch. + * @param options Options for receiveMessages */ - receiveBatch(maxMessages: number, options?: ReceiveBatchOptions): Promise; - - /** - * Returns a promise that resolves to a deferred message identified by the given `sequenceNumber`. - * @param {Long} sequenceNumber The sequence number of the message that needs to be received. - * @param options - Options bag to pass an abort signal or tracing options. - * @returns {(Promise)} - * - Returns `Message` identified by sequence number. - * - Returns `undefined` if no such message is found. - * @throws Error if the underlying connection or receiver is closed. - * @throws MessagingError if the service returns an error while receiving deferred message. - */ - receiveDeferredMessage( - sequenceNumber: Long, - options?: OperationOptions - ): Promise; + receiveMessages(maxMessages: number, options?: ReceiveMessagesOptions): Promise; /** * Returns a promise that resolves to an array of deferred messages identified by given `sequenceNumbers`. - * @param {Long[]} sequenceNumbers An array of sequence numbers for the messages that need to be received. + * @param sequenceNumbers The sequence number or an array of sequence numbers for the messages that need to be received. * @param options - Options bag to pass an abort signal or tracing options. * @returns {Promise} * - Returns a list of messages identified by the given sequenceNumbers. @@ -80,7 +64,7 @@ export interface Receiver { * @throws MessagingError if the service returns an error while receiving deferred messages. */ receiveDeferredMessages( - sequenceNumbers: Long[], + sequenceNumbers: Long | Long[], options?: OperationOptions ): Promise; /** @@ -270,9 +254,9 @@ export class ReceiverImpl { this._throwIfReceiverOrConnectionClosed(); this._throwIfAlreadyReceiving(); @@ -320,58 +304,9 @@ export class ReceiverImpl - * - Returns `Message` identified by sequence number. - * - Returns `undefined` if no such message is found. - * @throws Error if the underlying connection, client or receiver is closed. - * @throws MessagingError if the service returns an error while receiving deferred message. - */ - async receiveDeferredMessage( - sequenceNumber: Long, - options: OperationOptions = {} - ): Promise { - this._throwIfReceiverOrConnectionClosed(); - throwTypeErrorIfParameterMissing( - this._context.namespace.connectionId, - "sequenceNumber", - sequenceNumber - ); - throwTypeErrorIfParameterNotLong( - this._context.namespace.connectionId, - "sequenceNumber", - sequenceNumber - ); - - const receiveDeferredMessageOperationPromise = async () => { - const messages = await this._context.managementClient!.receiveDeferredMessages( - [sequenceNumber], - convertToInternalReceiveMode(this.receiveMode), - undefined, - { - ...options, - requestName: "receiveDeferredMessage", - timeoutInMs: this._retryOptions.timeoutInMs - } - ); - return (messages[0] as unknown) as ReceivedMessageT; - }; - const config: RetryConfig = { - operation: receiveDeferredMessageOperationPromise, - connectionId: this._context.namespace.connectionId, - operationType: RetryOperationType.management, - retryOptions: this._retryOptions, - abortSignal: options?.abortSignal - }; - return retry(config); - } - /** * Returns a promise that resolves to an array of deferred messages identified by given `sequenceNumbers`. - * @param sequenceNumbers An array of sequence numbers for the messages that need to be received. + * @param sequenceNumbers The sequence number or an array of sequence numbers for the messages that need to be received. * @param options - Options bag to pass an abort signal or tracing options. * @returns Promise * - Returns a list of messages identified by the given sequenceNumbers. @@ -380,7 +315,7 @@ export class ReceiverImpl { this._throwIfReceiverOrConnectionClosed(); @@ -389,18 +324,18 @@ export class ReceiverImpl { const deferredMessages = await this._context.managementClient!.receiveDeferredMessages( - sequenceNumbers, + deferredSequenceNumbers, convertToInternalReceiveMode(this.receiveMode), undefined, { diff --git a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts index 1f1da0107f23..2437657792a4 100644 --- a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts @@ -4,7 +4,7 @@ import { ClientEntityContext } from "../clientEntityContext"; import { MessageHandlers, - ReceiveBatchOptions, + ReceiveMessagesOptions, ReceivedMessage, SessionMessageHandlerOptions, SubscribeOptions @@ -21,8 +21,7 @@ import { getReceiverClosedErrorMsg, throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing, - throwTypeErrorIfParameterNotLong, - throwTypeErrorIfParameterNotLongArray + throwTypeErrorIfParameterNotLong } from "../util/errors"; import * as log from "../log"; import { OnError, OnMessage } from "../core/messageReceiver"; @@ -370,59 +369,9 @@ export class SessionReceiverImpl(config); } - /** - * Returns a promise that resolves to a deferred message identified by the given `sequenceNumber`. - * @param sequenceNumber The sequence number of the message that needs to be received. - * @param options - Options bag to pass an abort signal or tracing options. - * @returns Promise - * - Returns `Message` identified by sequence number. - * - Returns `undefined` if no such message is found. - * @throws Error if the underlying connection or receiver is closed. - * @throws MessagingError if the service returns an error while receiving deferred message. - */ - async receiveDeferredMessage( - sequenceNumber: Long, - options: OperationOptions = {} - ): Promise { - this._throwIfReceiverOrConnectionClosed(); - throwTypeErrorIfParameterMissing( - this._context.namespace.connectionId, - "sequenceNumber", - sequenceNumber - ); - throwTypeErrorIfParameterNotLong( - this._context.namespace.connectionId, - "sequenceNumber", - sequenceNumber - ); - - const receiveDeferredMessageOperationPromise = async () => { - await this._createMessageSessionIfDoesntExist(); - const messages = await this._context.managementClient!.receiveDeferredMessages( - [sequenceNumber], - convertToInternalReceiveMode(this.receiveMode), - this.sessionId, - { - ...options, - requestName: "receiveDeferredMessage", - timeoutInMs: this._retryOptions.timeoutInMs - } - ); - return (messages[0] as unknown) as ReceivedMessageT; - }; - const config: RetryConfig = { - operation: receiveDeferredMessageOperationPromise, - connectionId: this._context.namespace.connectionId, - operationType: RetryOperationType.management, - retryOptions: this._retryOptions, - abortSignal: options?.abortSignal - }; - return retry(config); - } - /** * Returns a promise that resolves to an array of deferred messages identified by given `sequenceNumbers`. - * @param sequenceNumbers An array of sequence numbers for the messages that need to be received. + * @param sequenceNumbers The sequence number or an array of sequence numbers for the messages that need to be received. * @param options - Options bag to pass an abort signal or tracing options. * @returns Promise * - Returns a list of messages identified by the given sequenceNumbers. @@ -431,7 +380,7 @@ export class SessionReceiverImpl { this._throwIfReceiverOrConnectionClosed(); @@ -440,19 +389,19 @@ export class SessionReceiverImpl { await this._createMessageSessionIfDoesntExist(); const deferredMessages = await this._context.managementClient!.receiveDeferredMessages( - sequenceNumbers, + deferredSequenceNumbers, convertToInternalReceiveMode(this.receiveMode), this.sessionId, { @@ -488,9 +437,9 @@ export class SessionReceiverImpl { this._throwIfReceiverOrConnectionClosed(); this._throwIfAlreadyReceiving(); diff --git a/sdk/servicebus/service-bus/src/receivers/shared.ts b/sdk/servicebus/service-bus/src/receivers/shared.ts index d7e65bd98e2b..50dcc99d2fa3 100644 --- a/sdk/servicebus/service-bus/src/receivers/shared.ts +++ b/sdk/servicebus/service-bus/src/receivers/shared.ts @@ -29,7 +29,7 @@ export async function* getMessageIterator( options?: GetMessageIteratorOptions ): AsyncIterableIterator { while (true) { - const messages = await receiver.receiveBatch(1, options); + const messages = await receiver.receiveMessages(1, options); // In EventHubs we've had a concept of "punctuation" (thanks @jsquire) that // allows the user, when working in a model like this, to get a periodic "no message diff --git a/sdk/servicebus/service-bus/src/sender.ts b/sdk/servicebus/service-bus/src/sender.ts index 303912db540c..a72f3484d59a 100644 --- a/sdk/servicebus/service-bus/src/sender.ts +++ b/sdk/servicebus/service-bus/src/sender.ts @@ -10,8 +10,7 @@ import { getSenderClosedErrorMsg, throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing, - throwTypeErrorIfParameterNotLong, - throwTypeErrorIfParameterNotLongArray + throwTypeErrorIfParameterNotLong } from "./util/errors"; import { ServiceBusMessageBatch } from "./serviceBusMessageBatch"; import { CreateBatchOptions, SenderOpenOptions } from "./models"; @@ -32,50 +31,25 @@ import { OperationOptions } from "./modelsToBeSharedWithEventHubs"; */ export interface Sender { /** - * Sends the given message after creating an AMQP Sender link if it doesnt already exists. - * - * To send a message to a `session` and/or `partition` enabled Queue/Topic, set the `sessionId` - * and/or `partitionKey` properties respectively on the message. - * - * @param message - Message to send. - * @param options - Options bag to pass an abort signal or tracing options. - * @returns Promise - * @throws Error if the underlying connection, client or sender is closed. - * @throws MessagingError if the service returns an error while sending messages to the service. - */ - send(message: ServiceBusMessage, options?: OperationOptions): Promise; - /** - * Sends the given messages in a single batch i.e. in a single AMQP message after creating an AMQP - * Sender link if it doesn't already exist. + * Sends the given messages after creating an AMQP Sender link if it doesn't already exist. + * Consider awaiting on open() beforehand to front load the work of link creation if needed. * * - To send messages to a `session` and/or `partition` enabled Queue/Topic, set the `sessionId` * and/or `partitionKey` properties respectively on the messages. - * - When doing so, all messages in the batch should have the same `sessionId` (if using + * - All messages passed to the same sendMessages() call should have the same `sessionId` (if using * sessions) and the same `partitionKey` (if using partitions). * - * @param messages - An array of ServiceBusMessage objects to be sent in a Batch message. + * @param messages - A single message or an array of messages or a batch of messages created via the createBatch() + * method to send. * @param options - Options bag to pass an abort signal or tracing options. * @return Promise * @throws Error if the underlying connection, client or sender is closed. * @throws MessagingError if the service returns an error while sending messages to the service. */ - send(messages: ServiceBusMessage[], options?: OperationOptions): Promise; - /** - * Sends a batch of messages to the associated service-bus entity after creating an AMQP - * Sender link if it doesn't already exist. - * - * - To send messages to a `session` and/or `partition` enabled Queue/Topic, set the `sessionId` - * and/or `partitionKey` properties respectively on the messages. - * - When doing so, all messages in the batch should have the same `sessionId` (if using - * sessions) and the same `partitionKey` (if using partitions). - * - * @param {ServiceBusMessageBatch} messageBatch A batch of messages that you can create using the {@link createBatch} method. - * @param options - Options bag to pass an abort signal or tracing options. - * @returns {Promise} - * @throws MessagingError if an error is encountered while sending a message. - * @throws Error if the underlying connection or sender has been closed. - */ - send(messageBatch: ServiceBusMessageBatch, options?: OperationOptions): Promise; + sendMessages( + messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch, + options?: OperationOptions + ): Promise; /** * Creates an instance of `ServiceBusMessageBatch` to which one can add messages until the maximum supported size is reached. @@ -94,7 +68,7 @@ export interface Sender { * Opens the AMQP link to Azure Service Bus from the sender. * * It is not necessary to call this method in order to use the sender. It is - * recommended to call this before your first send() or sendBatch() call if you + * recommended to call this before your first sendMessages() call if you * want to front load the work of setting up the AMQP link to the service. * * @param options - Options bag to pass an abort signal. @@ -106,30 +80,12 @@ export interface Sender { * @readonly */ isClosed: boolean; - /** - * Schedules given message to appear on Service Bus Queue/Subscription at a later time. - * - * @param scheduledEnqueueTimeUtc - The UTC time at which the message should be enqueued. - * @param message - The message that needs to be scheduled. - * @param options - Options bag to pass an abort signal or tracing options. - * @returns Promise - The sequence number of the message that was scheduled. - * You will need the sequence number if you intend to cancel the scheduling of the message. - * Save the `Long` type as-is in your application without converting to number. Since JavaScript - * only supports 53 bit numbers, converting the `Long` to number will cause loss in precision. - * @throws Error if the underlying connection, client or sender is closed. - * @throws MessagingError if the service returns an error while scheduling a message. - */ - scheduleMessage( - scheduledEnqueueTimeUtc: Date, - message: ServiceBusMessage, - options?: OperationOptions - ): Promise; /** * Schedules given messages to appear on Service Bus Queue/Subscription at a later time. * * @param scheduledEnqueueTimeUtc - The UTC time at which the messages should be enqueued. - * @param messages - Array of Messages that need to be scheduled. + * @param messages - Message or an array of messages that need to be scheduled. * @param options - Options bag to pass an abort signal or tracing options. * @returns Promise - The sequence numbers of messages that were scheduled. * You will need the sequence number if you intend to cancel the scheduling of the messages. @@ -140,28 +96,22 @@ export interface Sender { */ scheduleMessages( scheduledEnqueueTimeUtc: Date, - messages: ServiceBusMessage[], + messages: ServiceBusMessage | ServiceBusMessage[], options?: OperationOptions ): Promise; - /** - * Cancels a message that was scheduled to appear on a ServiceBus Queue/Subscription. - * @param sequenceNumber - The sequence number of the message to be cancelled. - * @param options - Options bag to pass an abort signal or tracing options. - * @returns Promise - * @throws Error if the underlying connection, client or sender is closed. - * @throws MessagingError if the service returns an error while canceling a scheduled message. - */ - cancelScheduledMessage(sequenceNumber: Long, options?: OperationOptions): Promise; /** * Cancels multiple messages that were scheduled to appear on a ServiceBus Queue/Subscription. - * @param sequenceNumbers - An Array of sequence numbers of the messages to be cancelled. + * @param sequenceNumbers - Sequence number or an array of sequence numbers of the messages to be cancelled. * @param options - Options bag to pass an abort signal or tracing options. * @returns Promise * @throws Error if the underlying connection, client or sender is closed. * @throws MessagingError if the service returns an error while canceling scheduled messages. */ - cancelScheduledMessages(sequenceNumbers: Long[], options?: OperationOptions): Promise; + cancelScheduledMessages( + sequenceNumbers: Long | Long[], + options?: OperationOptions + ): Promise; /** * Path of the entity for which the sender has been created. */ @@ -221,19 +171,16 @@ export class SenderImpl implements Sender { return this._isClosed || this._context.isClosed; } - async send(message: ServiceBusMessage, options?: OperationOptions): Promise; - async send(messages: ServiceBusMessage[], options?: OperationOptions): Promise; - async send(messageBatch: ServiceBusMessageBatch, options?: OperationOptions): Promise; - async send( - messageOrMessagesOrBatch: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch, + async sendMessages( + messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch, options?: OperationOptions ): Promise { this._throwIfSenderOrConnectionClosed(); - if (Array.isArray(messageOrMessagesOrBatch)) { + if (Array.isArray(messages)) { const batch = await this.createBatch(options); - for (const message of messageOrMessagesOrBatch) { + for (const message of messages) { if (!batch.tryAdd(message)) { // this is too big - throw an error const error = new MessagingError( @@ -245,10 +192,10 @@ export class SenderImpl implements Sender { } return this._sender.sendBatch(batch, options); - } else if (isServiceBusMessageBatch(messageOrMessagesOrBatch)) { - return this._sender.sendBatch(messageOrMessagesOrBatch, options); - } else if (isServiceBusMessage(messageOrMessagesOrBatch)) { - return this._sender.send(messageOrMessagesOrBatch, options); + } else if (isServiceBusMessageBatch(messages)) { + return this._sender.sendBatch(messages, options); + } else if (isServiceBusMessage(messages)) { + return this._sender.send(messages, options); } else { throw new TypeError( "Invalid type for message. Must be a ServiceBusMessage, an array of ServiceBusMessage or a ServiceBusMessageBatch" @@ -261,77 +208,30 @@ export class SenderImpl implements Sender { return this._sender.createBatch(options); } - /** - * Schedules given message to appear on Service Bus Queue/Subscription at a later time. - * - * @param scheduledEnqueueTimeUtc - The UTC time at which the message should be enqueued. - * @param message - The message that needs to be scheduled. - * @param options - Options bag to pass an abort signal or tracing options. - * @returns Promise - The sequence number of the message that was scheduled. - * You will need the sequence number if you intend to cancel the scheduling of the message. - * Save the `Long` type as-is in your application without converting to number. Since JavaScript - * only supports 53 bit numbers, converting the `Long` to number will cause loss in precision. - * @throws Error if the underlying connection, client or sender is closed. - * @throws MessagingError if the service returns an error while scheduling a message. - */ - async scheduleMessage( + async scheduleMessages( scheduledEnqueueTimeUtc: Date, - message: ServiceBusMessage, + messages: ServiceBusMessage | ServiceBusMessage[], options: OperationOptions = {} - ): Promise { + ): Promise { this._throwIfSenderOrConnectionClosed(); throwTypeErrorIfParameterMissing( this._context.namespace.connectionId, "scheduledEnqueueTimeUtc", scheduledEnqueueTimeUtc ); - throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "message", message); + throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "messages", messages); + const messagesToSchedule = Array.isArray(messages) ? messages : [messages]; const scheduleMessageOperationPromise = async () => { - const result = await this._context.managementClient!.scheduleMessages( + return this._context.managementClient!.scheduleMessages( scheduledEnqueueTimeUtc, - [message], + messagesToSchedule, { ...options, - requestName: "scheduleMessage", + requestName: "scheduleMessages", timeoutInMs: this._retryOptions.timeoutInMs } ); - return result[0]; - }; - - const config: RetryConfig = { - operation: scheduleMessageOperationPromise, - connectionId: this._context.namespace.connectionId, - operationType: RetryOperationType.management, - retryOptions: this._retryOptions, - abortSignal: options?.abortSignal - }; - return retry(config); - } - - async scheduleMessages( - scheduledEnqueueTimeUtc: Date, - messages: ServiceBusMessage[], - options: OperationOptions = {} - ): Promise { - this._throwIfSenderOrConnectionClosed(); - throwTypeErrorIfParameterMissing( - this._context.namespace.connectionId, - "scheduledEnqueueTimeUtc", - scheduledEnqueueTimeUtc - ); - throwTypeErrorIfParameterMissing(this._context.namespace.connectionId, "messages", messages); - if (!Array.isArray(messages)) { - messages = [messages]; - } - - const scheduleMessageOperationPromise = async () => { - return this._context.managementClient!.scheduleMessages(scheduledEnqueueTimeUtc, messages, { - ...options, - requestName: "scheduleMessages", - timeoutInMs: this._retryOptions.timeoutInMs - }); }; const config: RetryConfig = { operation: scheduleMessageOperationPromise, @@ -343,41 +243,8 @@ export class SenderImpl implements Sender { return retry(config); } - async cancelScheduledMessage( - sequenceNumber: Long, - options: OperationOptions = {} - ): Promise { - this._throwIfSenderOrConnectionClosed(); - throwTypeErrorIfParameterMissing( - this._context.namespace.connectionId, - "sequenceNumber", - sequenceNumber - ); - throwTypeErrorIfParameterNotLong( - this._context.namespace.connectionId, - "sequenceNumber", - sequenceNumber - ); - - const cancelSchedulesMessagesOperationPromise = async () => { - return this._context.managementClient!.cancelScheduledMessages([sequenceNumber], { - ...options, - requestName: "cancelScheduledMessage", - timeoutInMs: this._retryOptions.timeoutInMs - }); - }; - const config: RetryConfig = { - operation: cancelSchedulesMessagesOperationPromise, - connectionId: this._context.namespace.connectionId, - operationType: RetryOperationType.management, - retryOptions: this._retryOptions, - abortSignal: options?.abortSignal - }; - return retry(config); - } - async cancelScheduledMessages( - sequenceNumbers: Long[], + sequenceNumbers: Long | Long[], options: OperationOptions = {} ): Promise { this._throwIfSenderOrConnectionClosed(); @@ -386,17 +253,17 @@ export class SenderImpl implements Sender { "sequenceNumbers", sequenceNumbers ); - if (!Array.isArray(sequenceNumbers)) { - sequenceNumbers = [sequenceNumbers]; - } - throwTypeErrorIfParameterNotLongArray( + throwTypeErrorIfParameterNotLong( this._context.namespace.connectionId, "sequenceNumbers", sequenceNumbers ); + const sequenceNumbersToCancel = Array.isArray(sequenceNumbers) + ? sequenceNumbers + : [sequenceNumbers]; const cancelSchedulesMessagesOperationPromise = async () => { - return this._context.managementClient!.cancelScheduledMessages(sequenceNumbers, { + return this._context.managementClient!.cancelScheduledMessages(sequenceNumbersToCancel, { ...options, requestName: "cancelScheduledMessages", timeoutInMs: this._retryOptions.timeoutInMs diff --git a/sdk/servicebus/service-bus/src/util/errors.ts b/sdk/servicebus/service-bus/src/util/errors.ts index 354c9dd54311..0737f9d66a3c 100644 --- a/sdk/servicebus/service-bus/src/util/errors.ts +++ b/sdk/servicebus/service-bus/src/util/errors.ts @@ -160,7 +160,7 @@ export function throwTypeErrorIfParameterTypeMismatch( /** * @internal - * Logs and Throws TypeError if given parameter is not of type `Long` + * Logs and Throws TypeError if given parameter is not of type `Long` or an array of type `Long` * @param connectionId Id of the underlying AMQP connection used for logging * @param parameterName Name of the parameter to type check * @param parameterValue Value of the parameter to type check @@ -170,6 +170,9 @@ export function throwTypeErrorIfParameterNotLong( parameterName: string, parameterValue: any ): TypeError | undefined { + if (Array.isArray(parameterValue)) { + return throwTypeErrorIfParameterNotLongArray(connectionId, parameterName, parameterValue); + } if (Long.isLong(parameterValue)) { return; } diff --git a/sdk/servicebus/service-bus/test/backupMessageSettlement.spec.ts b/sdk/servicebus/service-bus/test/backupMessageSettlement.spec.ts index d9c298435fe6..343559e030fd 100644 --- a/sdk/servicebus/service-bus/test/backupMessageSettlement.spec.ts +++ b/sdk/servicebus/service-bus/test/backupMessageSettlement.spec.ts @@ -57,8 +57,8 @@ describe("Backup message settlement - Through ManagementLink", () => { async function sendReceiveMsg( testMessages: ServiceBusMessage ): Promise { - await sender.send(testMessages); - const msgs = await receiver.receiveBatch(1); + await sender.sendMessages(testMessages); + const msgs = await receiver.receiveMessages(1); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Unexpected number of messages"); @@ -95,7 +95,7 @@ describe("Backup message settlement - Through ManagementLink", () => { receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); if (entityNames.usesSessions) { should.equal(errorWasThrown, true, "Error was not thrown for messages with session-id"); - const msgBatch = await receiver.receiveBatch(1); + const msgBatch = await receiver.receiveMessages(1); await msgBatch[0].complete(); } else { should.equal(errorWasThrown, false, "Error was thrown for sessions without session-id"); @@ -181,7 +181,7 @@ describe("Backup message settlement - Through ManagementLink", () => { receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); await testPeekMsgsLength(receiver, 1); - const messageBatch = await receiver.receiveBatch(1); + const messageBatch = await receiver.receiveMessages(1); await messageBatch[0].complete(); @@ -276,13 +276,13 @@ describe("Backup message settlement - Through ManagementLink", () => { } receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); if (!entityNames.usesSessions) { - const deferredMsgs = await receiver.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { + const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber); + if (!deferredMsg) { throw "No message received for sequence number"; } - await deferredMsgs.complete(); + await deferredMsg.complete(); } else { - const messageBatch = await receiver.receiveBatch(1); + const messageBatch = await receiver.receiveMessages(1); await messageBatch[0].complete(); } await testPeekMsgsLength(receiver, 0); @@ -369,7 +369,7 @@ describe("Backup message settlement - Through ManagementLink", () => { receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); if (!entityNames.usesSessions) { - const deadLetterMsgsBatch = await deadLetterReceiver.receiveBatch(1); + const deadLetterMsgsBatch = await deadLetterReceiver.receiveMessages(1); should.equal( Array.isArray(deadLetterMsgsBatch), @@ -392,7 +392,7 @@ describe("Backup message settlement - Through ManagementLink", () => { await testPeekMsgsLength(deadLetterReceiver, 0); } else { - const messageBatch = await receiver.receiveBatch(1); + const messageBatch = await receiver.receiveMessages(1); await messageBatch[0].complete(); await testPeekMsgsLength(receiver, 0); @@ -485,7 +485,7 @@ describe("Backup message settlement - Through ManagementLink", () => { receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); if (entityNames.usesSessions) { should.equal(errorWasThrown, true, "Error was not thrown for messages with session-id"); - const msgBatch = await receiver.receiveBatch(1); + const msgBatch = await receiver.receiveMessages(1); await msgBatch[0].complete(); } else { should.equal(errorWasThrown, false, "Error was thrown for sessions without session-id"); diff --git a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts index 3b8066dcb047..dec860e9b9aa 100644 --- a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts @@ -62,8 +62,8 @@ describe("batchReceiver", () => { async function sendReceiveMsg( testMessages: ServiceBusMessage ): Promise { - await sender.send(testMessages); - const msgs = await receiver.receiveBatch(1); + await sender.sendMessages(testMessages); + const msgs = await receiver.receiveMessages(1); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Unexpected number of messages"); @@ -142,7 +142,7 @@ describe("batchReceiver", () => { await testPeekMsgsLength(receiver, 1); - const messageBatch = await receiver.receiveBatch(1); + const messageBatch = await receiver.receiveMessages(1); should.equal(messageBatch.length, 1, "Unexpected number of messages"); should.equal(messageBatch[0].deliveryCount, 1, "DeliveryCount is different than expected"); @@ -215,11 +215,11 @@ describe("batchReceiver", () => { async function testAbandonMsgsTillMaxDeliveryCount(useSessions?: boolean): Promise { const testMessages = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); - await sender.send(testMessages); + await sender.sendMessages(testMessages); let abandonMsgCount = 0; while (abandonMsgCount < maxDeliveryCount) { - const batch = await receiver.receiveBatch(1); + const batch = await receiver.receiveMessages(1); should.equal(batch.length, 1, "Unexpected number of messages"); should.equal( @@ -239,7 +239,7 @@ describe("batchReceiver", () => { await testPeekMsgsLength(receiver, 0); - const deadLetterMsgsBatch = await deadLetterReceiver.receiveBatch(1); + const deadLetterMsgsBatch = await deadLetterReceiver.receiveMessages(1); should.equal( Array.isArray(deadLetterMsgsBatch), @@ -329,19 +329,19 @@ describe("batchReceiver", () => { const sequenceNumber = msg.sequenceNumber; await msg.defer(); - const deferredMsgs = await receiver.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { + const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber); + if (!deferredMsg) { throw "No message received for sequence number"; } - should.equal(deferredMsgs.body, testMessages.body, "MessageBody is different than expected"); + should.equal(deferredMsg.body, testMessages.body, "MessageBody is different than expected"); should.equal( - deferredMsgs.messageId, + deferredMsg.messageId, testMessages.messageId, "MessageId is different than expected" ); - should.equal(deferredMsgs.deliveryCount, 1, "DeliveryCount is different than expected"); + should.equal(deferredMsg.deliveryCount, 1, "DeliveryCount is different than expected"); - await deferredMsgs.complete(); + await deferredMsg.complete(); await testPeekMsgsLength(receiver, 0); } @@ -409,7 +409,7 @@ describe("batchReceiver", () => { await testPeekMsgsLength(receiver, 0); - const deadLetterMsgsBatch = await deadLetterReceiver.receiveBatch(1); + const deadLetterMsgsBatch = await deadLetterReceiver.receiveMessages(1); should.equal( Array.isArray(deadLetterMsgsBatch), @@ -498,8 +498,8 @@ describe("batchReceiver", () => { async function deadLetterMessage( testMessage: ServiceBusMessage ): Promise { - await sender.send(testMessage); - const batch = await receiver.receiveBatch(1); + await sender.sendMessages(testMessage); + const batch = await receiver.receiveMessages(1); should.equal(batch.length, 1, "Unexpected number of messages"); should.equal(batch[0].body, testMessage.body, "MessageBody is different than expected"); @@ -514,7 +514,7 @@ describe("batchReceiver", () => { await testPeekMsgsLength(receiver, 0); - const deadLetterMsgsBatch = await deadLetterReceiver.receiveBatch(1); + const deadLetterMsgsBatch = await deadLetterReceiver.receiveMessages(1); should.equal(deadLetterMsgsBatch.length, 1, "Unexpected number of messages"); should.equal( @@ -541,7 +541,7 @@ describe("batchReceiver", () => { deadletterClient: Receiver, expectedDeliverCount: number ): Promise { - const deadLetterMsgsBatch = await deadLetterReceiver.receiveBatch(1); + const deadLetterMsgsBatch = await deadLetterReceiver.receiveMessages(1); should.equal(deadLetterMsgsBatch.length, 1, "Unexpected number of messages"); should.equal( @@ -651,18 +651,18 @@ describe("batchReceiver", () => { const sequenceNumber = deadLetterMsg.sequenceNumber; await deadLetterMsg.defer(); - const deferredMsgs = await deadLetterReceiver.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { + const [deferredMsg] = await deadLetterReceiver.receiveDeferredMessages(sequenceNumber); + if (!deferredMsg) { throw "No message received for sequence number"; } - should.equal(deferredMsgs.body, testMessage.body, "MessageBody is different than expected"); + should.equal(deferredMsg.body, testMessage.body, "MessageBody is different than expected"); should.equal( - deferredMsgs.messageId, + deferredMsg.messageId, testMessage.messageId, "MessageId is different than expected" ); - await deferredMsgs.complete(); + await deferredMsg.complete(); await testPeekMsgsLength(receiver, 0); @@ -705,7 +705,7 @@ describe("batchReceiver", () => { // We use an empty queue/topic here so that the first receiveMessages call takes time to return async function testParallelReceiveCalls(useSessions?: boolean): Promise { - const firstBatchPromise = receiver.receiveBatch(1, { maxWaitTimeInMs: 10000 }); + const firstBatchPromise = receiver.receiveMessages(1, { maxWaitTimeInMs: 10000 }); await delay(5000); let errorMessage; @@ -715,7 +715,7 @@ describe("batchReceiver", () => { ); try { - await receiver.receiveBatch(1); + await receiver.receiveMessages(1); } catch (err) { errorMessage = err && err.message; } @@ -799,9 +799,9 @@ describe("batchReceiver", () => { for (const message of testMessages) { batchMessageToSend.tryAdd(message); } - await sender.send(batchMessageToSend); - const msgs1 = await receiver.receiveBatch(1); - const msgs2 = await receiver.receiveBatch(1); + await sender.sendMessages(batchMessageToSend); + const msgs1 = await receiver.receiveMessages(1); + const msgs2 = await receiver.receiveMessages(1); // Results are checked after both receiveMessages are done to ensure that the second call doesnt // affect the result from the first one. @@ -848,9 +848,9 @@ describe("batchReceiver", () => { async function testNoSettlement(useSessions?: boolean): Promise { const testMessages = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); - await sender.send(testMessages); + await sender.sendMessages(testMessages); - let batch = await receiver.receiveBatch(1); + let batch = await receiver.receiveMessages(1); should.equal(batch.length, 1, "Unexpected number of messages"); should.equal(batch[0].deliveryCount, 0, "DeliveryCount is different than expected"); @@ -862,7 +862,7 @@ describe("batchReceiver", () => { await testPeekMsgsLength(receiver, 1); - batch = await receiver.receiveBatch(1); + batch = await receiver.receiveMessages(1); should.equal(batch.length, 1, "Unexpected number of messages"); should.equal(batch[0].deliveryCount, 1, "DeliveryCount is different than expected"); @@ -905,8 +905,8 @@ describe("batchReceiver", () => { async function testAskForMore(useSessions?: boolean): Promise { const testMessages = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); - await sender.send(testMessages); - const batch = await receiver.receiveBatch(2); + await sender.sendMessages(testMessages); + const batch = await receiver.receiveMessages(2); should.equal(batch.length, 1, "Unexpected number of messages"); should.equal(batch[0].body, testMessages.body, "MessageBody is different than expected"); @@ -983,20 +983,6 @@ describe("batchReceiver", () => { }); describe("Cancel operations on the receiver", function(): void { - it("Abort receiveDeferredMessage request on the receiver", async function(): Promise { - await beforeEachTest(TestClientType.PartitionedQueue); - const controller = new AbortController(); - setTimeout(() => controller.abort(), 1); - try { - await receiver.receiveDeferredMessage(Long.ZERO, { abortSignal: controller.signal }); - throw new Error(`Test failure`); - } catch (err) { - err.message.should.equal( - "The receiveDeferredMessage operation has been cancelled by the user." - ); - } - }); - it("Abort receiveDeferredMessages request on the receiver", async function(): Promise { await beforeEachTest(TestClientType.PartitionedQueueWithSessions); const controller = new AbortController(); @@ -1057,11 +1043,11 @@ describe("Batching - disconnects", function(): void { await beforeEachTest(TestClientType.UnpartitionedQueue); // Send a message so we can be sure when the receiver is open and active. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); let settledMessageCount = 0; - const messages1 = await (receiver as Receiver).receiveBatch(1, { + const messages1 = await (receiver as Receiver).receiveMessages(1, { maxWaitTimeInMs: 5000 }); for (const message of messages1) { @@ -1086,10 +1072,10 @@ describe("Batching - disconnects", function(): void { // Otherwise, it will get into a bad internal state with uncaught exceptions. await delay(2000); // send a second message to trigger the message handler again. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); // wait for the 2nd message to be received. - const messages2 = await (receiver as Receiver).receiveBatch(1, { + const messages2 = await (receiver as Receiver).receiveMessages(1, { maxWaitTimeInMs: 5000 }); for (const message of messages2) { @@ -1109,14 +1095,14 @@ describe("Batching - disconnects", function(): void { // The first time `receiveMessages` is called the receiver link is created. // The `receiver_drained` handler is only added after the link is created, // which is a non-blocking task. - await receiver.receiveBatch(1, { maxWaitTimeInMs: 1000 }); + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); const receiverContext = (receiver as ReceiverImpl)["_context"]; if (!receiverContext.batchingReceiver!.isOpen()) { throw new Error(`Unable to initialize receiver link.`); } // Send a message so we have something to receive. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); // Since the receiver has already been initialized, // the `receiver_drained` handler is attached as soon @@ -1148,17 +1134,17 @@ describe("Batching - disconnects", function(): void { // Purposefully request more messages than what's available // so that the receiver will have to drain. - const messages1 = await receiver.receiveBatch(10, { maxWaitTimeInMs: 1000 }); + const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); didRequestDrain.should.equal(true, "Drain was not requested."); messages1.length.should.equal(1, "Unexpected number of messages received."); // Make sure that a 2nd receiveMessages call still works // by sending and receiving a single message again. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); // wait for the 2nd message to be received. - const messages2 = await receiver.receiveBatch(1, { maxWaitTimeInMs: 5000 }); + const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); messages2.length.should.equal(1, "Unexpected number of messages received."); }); @@ -1170,7 +1156,7 @@ describe("Batching - disconnects", function(): void { // The first time `receiveMessages` is called the receiver link is created. // The `receiver_drained` handler is only added after the link is created, // which is a non-blocking task. - await receiver.receiveBatch(1, { maxWaitTimeInMs: 1000 }); + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); const receiverContext = (receiver as ReceiverImpl)["_context"]; if (!receiverContext.batchingReceiver!.isOpen()) { @@ -1178,7 +1164,7 @@ describe("Batching - disconnects", function(): void { } // Send a message so we have something to receive. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); // Since the receiver has already been initialized, // the `receiver_drained` handler is attached as soon @@ -1212,7 +1198,7 @@ describe("Batching - disconnects", function(): void { // so that the receiver will have to drain. const testFailureMessage = "Test failure"; try { - await receiver.receiveBatch(10, { maxWaitTimeInMs: 1000 }); + await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); throw new Error(testFailureMessage); } catch (err) { err.message.should.not.equal(testFailureMessage); @@ -1222,10 +1208,10 @@ describe("Batching - disconnects", function(): void { // Make sure that a 2nd receiveMessages call still works // by sending and receiving a single message again. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); // wait for the 2nd message to be received. - const messages = await receiver.receiveBatch(1, { maxWaitTimeInMs: 5000 }); + const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); messages.length.should.equal(1, "Unexpected number of messages received."); }); @@ -1237,7 +1223,7 @@ describe("Batching - disconnects", function(): void { // The first time `receiveMessages` is called the receiver link is created. // The `receiver_drained` handler is only added after the link is created, // which is a non-blocking task. - await receiver.receiveBatch(1, { maxWaitTimeInMs: 1000 }); + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); const receiverContext = (receiver as ReceiverImpl)["_context"]; if (!receiverContext.batchingReceiver!.isOpen()) { @@ -1245,7 +1231,7 @@ describe("Batching - disconnects", function(): void { } // Send a message so we have something to receive. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); // Simulate a disconnect after a message has been received. receiverContext.batchingReceiver!["_receiver"]!.once("message", function() { @@ -1257,16 +1243,16 @@ describe("Batching - disconnects", function(): void { // Purposefully request more messages than what's available // so that the receiver will have to drain. - const messages1 = await receiver.receiveBatch(10, { maxWaitTimeInMs: 10000 }); + const messages1 = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); messages1.length.should.equal(1, "Unexpected number of messages received."); // Make sure that a 2nd receiveMessages call still works // by sending and receiving a single message again. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); // wait for the 2nd message to be received. - const messages2 = await receiver.receiveBatch(1, { maxWaitTimeInMs: 5000 }); + const messages2 = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); messages2.length.should.equal(1, "Unexpected number of messages received."); }); @@ -1278,7 +1264,7 @@ describe("Batching - disconnects", function(): void { // The first time `receiveMessages` is called the receiver link is created. // The `receiver_drained` handler is only added after the link is created, // which is a non-blocking task. - await receiver.receiveBatch(1, { maxWaitTimeInMs: 1000 }); + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); const receiverContext = (receiver as ReceiverImpl)["_context"]; if (!receiverContext.batchingReceiver!.isOpen()) { @@ -1295,7 +1281,7 @@ describe("Batching - disconnects", function(): void { // so that the receiver will have to drain. const testFailureMessage = "Test failure"; try { - const msgs = await receiver.receiveBatch(10, { maxWaitTimeInMs: 10000 }); + const msgs = await receiver.receiveMessages(10, { maxWaitTimeInMs: 10000 }); console.log(msgs.length); throw new Error(testFailureMessage); } catch (err) { @@ -1304,10 +1290,10 @@ describe("Batching - disconnects", function(): void { // Make sure that a 2nd receiveMessages call still works // by sending and receiving a single message again. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); // wait for the 2nd message to be received. - const messages = await receiver.receiveBatch(1, { maxWaitTimeInMs: 5000 }); + const messages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); messages.length.should.equal(1, "Unexpected number of messages received."); }); diff --git a/sdk/servicebus/service-bus/test/deferredMessage.spec.ts b/sdk/servicebus/service-bus/test/deferredMessage.spec.ts index cd5f1fcb1ff6..9b6924c13f3d 100644 --- a/sdk/servicebus/service-bus/test/deferredMessage.spec.ts +++ b/sdk/servicebus/service-bus/test/deferredMessage.spec.ts @@ -45,15 +45,15 @@ describe("deferred messages", () => { /** * Sends, defers, receives and then returns a test message * @param testMessage Test message to send, defer, receive and then return - * @param useReceiveDeferredMessages Boolean to indicate whether to use `receiveDeferredMessage` or - * `receiveDeferredMessages` to ensure both get code coverage + * @param passSequenceNumberInArray Boolean to indicate whether to pass the sequence number + * as is or in an array to ensure both get code coverage */ async function deferMessage( testMessage: ServiceBusMessage, - useReceiveDeferredMessages: boolean + passSequenceNumberInArray: boolean ): Promise { - await sender.send(testMessage); - const receivedMsgs = await receiver.receiveBatch(1); + await sender.sendMessages(testMessage); + const receivedMsgs = await receiver.receiveMessages(1); should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); should.equal(receivedMsgs[0].body, testMessage.body, "MessageBody is different than expected"); @@ -70,16 +70,9 @@ describe("deferred messages", () => { const sequenceNumber = receivedMsgs[0].sequenceNumber; await receivedMsgs[0].defer(); - let deferredMsg: ReceivedMessageWithLock | undefined; - - // Randomly choose receiveDeferredMessage/receiveDeferredMessages as the latter is expected to - // convert single input to array and then use it - if (useReceiveDeferredMessages) { - [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber as any); - } else { - deferredMsg = await receiver.receiveDeferredMessage(sequenceNumber); - } - + const [deferredMsg] = await receiver.receiveDeferredMessages( + passSequenceNumberInArray ? [sequenceNumber] : sequenceNumber + ); if (!deferredMsg) { throw "No message received for sequence number"; } @@ -101,7 +94,7 @@ describe("deferred messages", () => { ): Promise { await testPeekMsgsLength(receiver, 1); - const deferredMsg = await receiver.receiveDeferredMessage(sequenceNumber); + const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber); if (!deferredMsg) { throw "No message received for sequence number"; } @@ -270,7 +263,7 @@ describe("deferred messages", () => { await testPeekMsgsLength(receiver, 0); - const deadLetterMsgs = await deadLetterReceiver.receiveBatch(1); + const deadLetterMsgs = await deadLetterReceiver.receiveMessages(1); should.equal(deadLetterMsgs.length, 1, "Unexpected number of messages"); should.equal( diff --git a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts index 80e51aa57785..0fa034438b8b 100644 --- a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts @@ -38,7 +38,7 @@ describe("BatchingReceiver unit tests", () => { } as BatchingReceiver; }; - await receiver.receiveBatch(1000, { + await receiver.receiveMessages(1000, { maxWaitTimeInMs: 60 * 1000, abortSignal: origAbortSignal }); diff --git a/sdk/servicebus/service-bus/test/internal/sender.spec.ts b/sdk/servicebus/service-bus/test/internal/sender.spec.ts index 2d0f3432666a..56f569f53826 100644 --- a/sdk/servicebus/service-bus/test/internal/sender.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/sender.spec.ts @@ -25,7 +25,7 @@ describe("sender unit tests", () => { const sender = new SenderImpl(createClientEntityContextForTests()); try { - await sender.send( + await sender.sendMessages( // @ts-expect-error invalidValue ); diff --git a/sdk/servicebus/service-bus/test/invalidParameters.spec.ts b/sdk/servicebus/service-bus/test/invalidParameters.spec.ts index 891e0b3e183f..e394a870d360 100644 --- a/sdk/servicebus/service-bus/test/invalidParameters.spec.ts +++ b/sdk/servicebus/service-bus/test/invalidParameters.spec.ts @@ -316,7 +316,7 @@ describe("invalid parameters", () => { sessionId: TestMessage.sessionId }); - await sender.send(TestMessage.getSessionSample()); + await sender.sendMessages(TestMessage.getSessionSample()); }); after(() => { @@ -455,36 +455,36 @@ describe("invalid parameters", () => { should.equal(caughtError && caughtError.message, `Invalid "MessageHandlers" provided.`); }); - it("ReceiveDeferredMessage: Wrong type sequenceNumber in SessionReceiver", async function(): Promise< + it("ReceiveDeferredMessages: Wrong type sequenceNumber in SessionReceiver", async function(): Promise< void > { let caughtError: Error | undefined; try { - await receiver.receiveDeferredMessage("somestring" as any); + await receiver.receiveDeferredMessages("somestring" as any); } catch (error) { caughtError = error; } should.equal(caughtError && caughtError.name, "TypeError"); should.equal( caughtError && caughtError.message, - `The parameter "sequenceNumber" should be of type "Long"` + `The parameter "sequenceNumbers" should be of type "Long"` ); }); - it("ReceiveDeferredMessage: Missing sequenceNumber in SessionReceiver", async function(): Promise< + it("ReceiveDeferredMessages: Missing sequenceNumber in SessionReceiver", async function(): Promise< void > { let caughtError: Error | undefined; try { - await receiver.receiveDeferredMessage(undefined as any); + await receiver.receiveDeferredMessages(undefined as any); } catch (error) { caughtError = error; } should.equal(caughtError && caughtError.name, "TypeError"); - should.equal(caughtError && caughtError.message, `Missing parameter "sequenceNumber"`); + should.equal(caughtError && caughtError.message, `Missing parameter "sequenceNumbers"`); }); - it("ReceiveDeferredMessages: Wrong type sequenceNumbers in SessionReceiver", async function(): Promise< + it("ReceiveDeferredMessages: Wrong type sequenceNumber array in SessionReceiver", async function(): Promise< void > { let caughtError: Error | undefined; @@ -499,19 +499,6 @@ describe("invalid parameters", () => { `The parameter "sequenceNumbers" should be an array of type "Long"` ); }); - - it("ReceiveDeferredMessages: Missing sequenceNumbers in SessionReceiver", async function(): Promise< - void - > { - let caughtError: Error | undefined; - try { - await receiver.receiveDeferredMessages(undefined as any); - } catch (error) { - caughtError = error; - } - should.equal(caughtError && caughtError.name, "TypeError"); - should.equal(caughtError && caughtError.message, `Missing parameter "sequenceNumbers"`); - }); }); describe("Invalid parameters in Receiver", function(): void { @@ -531,7 +518,7 @@ describe("invalid parameters", () => { receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); - await sender.send(TestMessage.getSessionSample()); + await sender.sendMessages(TestMessage.getSessionSample()); }); after(async () => { @@ -598,36 +585,36 @@ describe("invalid parameters", () => { should.equal(caughtError && caughtError.message, `Invalid "MessageHandlers" provided.`); }); - it("ReceiveDeferredMessage: Wrong type sequenceNumber in Receiver", async function(): Promise< + it("ReceiveDeferredMessages: Wrong type sequenceNumber in Receiver", async function(): Promise< void > { let caughtError: Error | undefined; try { - await receiver.receiveDeferredMessage("somestring" as any); + await receiver.receiveDeferredMessages("somestring" as any); } catch (error) { caughtError = error; } should.equal(caughtError && caughtError.name, "TypeError"); should.equal( caughtError && caughtError.message, - `The parameter "sequenceNumber" should be of type "Long"` + `The parameter "sequenceNumbers" should be of type "Long"` ); }); - it("ReceiveDeferredMessage: Missing sequenceNumber in Receiver", async function(): Promise< + it("ReceiveDeferredMessages: Missing sequenceNumber in Receiver", async function(): Promise< void > { let caughtError: Error | undefined; try { - await receiver.receiveDeferredMessage(undefined as any); + await receiver.receiveDeferredMessages(undefined as any); } catch (error) { caughtError = error; } should.equal(caughtError && caughtError.name, "TypeError"); - should.equal(caughtError && caughtError.message, `Missing parameter "sequenceNumber"`); + should.equal(caughtError && caughtError.message, `Missing parameter "sequenceNumbers"`); }); - it("ReceiveDeferredMessages: Wrong type sequenceNumbers in Receiver", async function(): Promise< + it("ReceiveDeferredMessages: Wrong type sequenceNumber array in Receiver", async function(): Promise< void > { let caughtError: Error | undefined; @@ -642,19 +629,6 @@ describe("invalid parameters", () => { `The parameter "sequenceNumbers" should be an array of type "Long"` ); }); - - it("ReceiveDeferredMessages: Missing sequenceNumbers in Receiver", async function(): Promise< - void - > { - let caughtError: Error | undefined; - try { - await receiver.receiveDeferredMessages(undefined as any); - } catch (error) { - caughtError = error; - } - should.equal(caughtError && caughtError.name, "TypeError"); - should.equal(caughtError && caughtError.message, `Missing parameter "sequenceNumbers"`); - }); }); describe("Invalid parameters in Sender", function(): void { @@ -675,31 +649,6 @@ describe("invalid parameters", () => { return serviceBusClient.test.afterEach(); }); - it("ScheduledMessage: Missing date in Sender", async function(): Promise { - let caughtError: Error | undefined; - try { - await sender.scheduleMessage(undefined as any, undefined as any); - } catch (error) { - caughtError = error; - } - should.equal(caughtError && caughtError.name, "TypeError"); - should.equal( - caughtError && caughtError.message, - `Missing parameter "scheduledEnqueueTimeUtc"` - ); - }); - - it("ScheduledMessage: Missing message in Sender", async function(): Promise { - let caughtError: Error | undefined; - try { - await sender.scheduleMessage(new Date(), undefined as any); - } catch (error) { - caughtError = error; - } - should.equal(caughtError && caughtError.name, "TypeError"); - should.equal(caughtError && caughtError.message, `Missing parameter "message"`); - }); - it("ScheduledMessages: Missing date in Sender", async function(): Promise { let caughtError: Error | undefined; try { @@ -725,34 +674,36 @@ describe("invalid parameters", () => { should.equal(caughtError && caughtError.message, `Missing parameter "messages"`); }); - it("CancelScheduledMessage: Wrong type sequenceNumber in Sender", async function(): Promise< + it("CancelScheduledMessages: Wrong type sequenceNumber in Sender", async function(): Promise< void > { let caughtError: Error | undefined; try { - await sender.cancelScheduledMessage("somestring" as any); + await sender.cancelScheduledMessages("somestring" as any); } catch (error) { caughtError = error; } should.equal(caughtError && caughtError.name, "TypeError"); should.equal( caughtError && caughtError.message, - `The parameter "sequenceNumber" should be of type "Long"` + `The parameter "sequenceNumbers" should be of type "Long"` ); }); - it("CancelScheduledMessage: Missing sequenceNumber in Sender", async function(): Promise { + it("CancelScheduledMessages: Missing sequenceNumbers in Sender", async function(): Promise< + void + > { let caughtError: Error | undefined; try { - await sender.cancelScheduledMessage(undefined as any); + await sender.cancelScheduledMessages(undefined as any); } catch (error) { caughtError = error; } should.equal(caughtError && caughtError.name, "TypeError"); - should.equal(caughtError && caughtError.message, `Missing parameter "sequenceNumber"`); + should.equal(caughtError && caughtError.message, `Missing parameter "sequenceNumbers"`); }); - it("CancelScheduledMessages: Wrong type sequenceNumbers in Sender", async function(): Promise< + it("CancelScheduledMessages: Wrong type sequenceNumbers array in Sender", async function(): Promise< void > { let caughtError: Error | undefined; @@ -767,18 +718,5 @@ describe("invalid parameters", () => { `The parameter "sequenceNumbers" should be an array of type "Long"` ); }); - - it("CancelScheduledMessages: Missing sequenceNumbers in Sender", async function(): Promise< - void - > { - let caughtError: Error | undefined; - try { - await sender.cancelScheduledMessages(undefined as any); - } catch (error) { - caughtError = error; - } - should.equal(caughtError && caughtError.name, "TypeError"); - should.equal(caughtError && caughtError.message, `Missing parameter "sequenceNumbers"`); - }); }); }); diff --git a/sdk/servicebus/service-bus/test/managementClient.spec.ts b/sdk/servicebus/service-bus/test/managementClient.spec.ts index c749d87b7dab..6d02f0f02a7c 100644 --- a/sdk/servicebus/service-bus/test/managementClient.spec.ts +++ b/sdk/servicebus/service-bus/test/managementClient.spec.ts @@ -51,8 +51,8 @@ describe("ManagementClient - disconnects", function(): void { await beforeEachTest(TestClientType.UnpartitionedQueue); // Send a message so we have something to peek. - await sender.send(TestMessage.getSample()); - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); let peekedMessageCount = 0; let messages = await receiver.peekMessages({ maxMessageCount: 1 }); @@ -90,12 +90,10 @@ describe("ManagementClient - disconnects", function(): void { await beforeEachTest(TestClientType.UnpartitionedQueue); // Send a message so we have something to peek. - const deliveryIds = []; - let deliveryId = await sender.scheduleMessage( + const deliveryIds = await sender.scheduleMessages( new Date("2020-04-25T12:00:00Z"), TestMessage.getSample() ); - deliveryIds.push(deliveryId); deliveryIds.length.should.equal(1, "Unexpected number of scheduled messages."); @@ -115,7 +113,7 @@ describe("ManagementClient - disconnects", function(): void { await delay(2000); // peek additional messages - deliveryId = await sender.scheduleMessage( + const [deliveryId] = await sender.scheduleMessages( new Date("2020-04-25T12:00:00Z"), TestMessage.getSample() ); diff --git a/sdk/servicebus/service-bus/test/propsToModify.spec.ts b/sdk/servicebus/service-bus/test/propsToModify.spec.ts index a50e8c129d91..6baa14397241 100644 --- a/sdk/servicebus/service-bus/test/propsToModify.spec.ts +++ b/sdk/servicebus/service-bus/test/propsToModify.spec.ts @@ -34,7 +34,7 @@ describe("dead lettering", () => { serviceBusClient.createSender(entityNames.queue) ); - await sender.send({ + await sender.sendMessages({ body: "message-body", sessionId: entityNames.usesSessions ? TestMessage.getSessionSample().sessionId : undefined }); @@ -46,7 +46,7 @@ describe("dead lettering", () => { receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); - const receivedMessages = await receiver.receiveBatch(1, { + const receivedMessages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); @@ -69,7 +69,9 @@ describe("dead lettering", () => { // defer this message so we can pick it up via the management API await receivedMessage.defer(); - const deferredMessage = await receiver.receiveDeferredMessage(receivedMessage.sequenceNumber!); + const [deferredMessage] = await receiver.receiveDeferredMessages( + receivedMessage.sequenceNumber! + ); await deferredMessage!.deadLetter({ deadLetterErrorDescription: "this is the dead letter error description (was deferred)", @@ -109,7 +111,9 @@ describe("dead lettering", () => { // defer this message so we can pick it up via the management API await receivedMessage.defer(); - const deferredMessage = await receiver.receiveDeferredMessage(receivedMessage.sequenceNumber!); + const [deferredMessage] = await receiver.receiveDeferredMessages( + receivedMessage.sequenceNumber! + ); await deferredMessage!.deadLetter({ deadLetterErrorDescription: "this is the dead letter error description (was deferred)", @@ -146,7 +150,7 @@ describe("dead lettering", () => { description: string; customProperty?: string; }) { - const deadLetterMessages = await deadLetterReceiver.receiveBatch(1); + const deadLetterMessages = await deadLetterReceiver.receiveMessages(1); should.exist(deadLetterMessages[0]); const reason = deadLetterMessages[0]!.userProperties!["DeadLetterReason"]; @@ -184,14 +188,14 @@ describe("abandoning", () => { serviceBusClient.createSender(entityNames.queue) ); - await sender.send({ + await sender.sendMessages({ body: "message-body", sessionId: entityNames.usesSessions ? TestMessage.getSessionSample().sessionId : undefined }); receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); - const receivedMessages = await receiver.receiveBatch(1, { + const receivedMessages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); @@ -211,13 +215,15 @@ describe("abandoning", () => { // defer this message so we can pick it up via the management API await receivedMessage.defer(); - const deferredMessage = await receiver.receiveDeferredMessage(receivedMessage.sequenceNumber!); + const [deferredMessage] = await receiver.receiveDeferredMessages( + receivedMessage.sequenceNumber! + ); await deferredMessage!.abandon({ customProperty: "hello, setting this custom property" }); - const abandonedMessage = await receiver.receiveDeferredMessage( + const [abandonedMessage] = await receiver.receiveDeferredMessages( deferredMessage!.sequenceNumber! ); await checkAbandonedMessage(abandonedMessage!, { @@ -231,7 +237,7 @@ describe("abandoning", () => { customProperty: "hello, setting this custom property" }); - const abandonedMessage = (await receiver.receiveBatch(1))[0]; + const abandonedMessage = (await receiver.receiveMessages(1))[0]; await checkAbandonedMessage(abandonedMessage, { customProperty: "hello, setting this custom property" }); @@ -242,13 +248,15 @@ describe("abandoning", () => { // defer this message so we can pick it up via the management API await receivedMessage.defer(); - const deferredMessage = await receiver.receiveDeferredMessage(receivedMessage.sequenceNumber!); + const [deferredMessage] = await receiver.receiveDeferredMessages( + receivedMessage.sequenceNumber! + ); await deferredMessage!.abandon({ customProperty: "hello, setting this custom property" }); - const abandonedMessage = await receiver.receiveDeferredMessage( + const [abandonedMessage] = await receiver.receiveDeferredMessages( deferredMessage!.sequenceNumber! ); await checkAbandonedMessage(abandonedMessage!, { @@ -262,7 +270,7 @@ describe("abandoning", () => { customProperty: "hello, setting this custom property" }); - const abandonedMessage = (await receiver.receiveBatch(1))[0]; + const abandonedMessage = (await receiver.receiveMessages(1))[0]; await checkAbandonedMessage(abandonedMessage, { customProperty: "hello, setting this custom property" }); @@ -305,14 +313,14 @@ describe("deferring", () => { serviceBusClient.createSender(entityNames.queue) ); - await sender.send({ + await sender.sendMessages({ body: "message-body", sessionId: entityNames.usesSessions ? TestMessage.getSessionSample().sessionId : undefined }); receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); - const receivedMessages = await receiver.receiveBatch(1, { + const receivedMessages = await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }); @@ -332,7 +340,9 @@ describe("deferring", () => { // defer this message so we can pick it up via the management API await receivedMessage.defer(); - const deferredMessage = await receiver.receiveDeferredMessage(receivedMessage.sequenceNumber!); + const [deferredMessage] = await receiver.receiveDeferredMessages( + receivedMessage.sequenceNumber! + ); await deferredMessage!.defer({ customProperty: "hello, setting this custom property" @@ -359,7 +369,9 @@ describe("deferring", () => { // defer this message so we can pick it up via the management API await receivedMessage.defer(); - const deferredMessage = await receiver.receiveDeferredMessage(receivedMessage.sequenceNumber!); + const [deferredMessage] = await receiver.receiveDeferredMessages( + receivedMessage.sequenceNumber! + ); await deferredMessage!.defer({ customProperty: "hello, setting this custom property" @@ -382,7 +394,9 @@ describe("deferring", () => { }); async function checkDeferredMessage(expected: { customProperty?: string }) { - const deferredMessage = await receiver.receiveDeferredMessage(receivedMessage.sequenceNumber!); + const [deferredMessage] = await receiver.receiveDeferredMessages( + receivedMessage.sequenceNumber! + ); should.exist(deferredMessage); diff --git a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts index f8e302f6fc27..c984c7e1651f 100644 --- a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts +++ b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts @@ -64,8 +64,8 @@ describe("receive and delete", () => { }); async function sendReceiveMsg(testMessages: ServiceBusMessage): Promise { - await sender.send(testMessages); - const msgs = await receiver.receiveBatch(1); + await sender.sendMessages(testMessages); + const msgs = await receiver.receiveMessages(1); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Unexpected number of messages"); @@ -153,7 +153,7 @@ describe("receive and delete", () => { testMessages: ServiceBusMessage, autoCompleteFlag: boolean ): Promise { - await sender.send(testMessages); + await sender.sendMessages(testMessages); const errors: string[] = []; const receivedMsgs: ReceivedMessage[] = []; @@ -323,8 +323,8 @@ describe("receive and delete", () => { }); async function sendReceiveMsg(testMessages: ServiceBusMessage): Promise { - await sender.send(testMessages); - const msgs = await receiver.receiveBatch(1); + await sender.sendMessages(testMessages); + const msgs = await receiver.receiveMessages(1); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Unexpected number of messages"); @@ -694,8 +694,8 @@ describe("receive and delete", () => { }); async function deferMessage(useSessions?: boolean): Promise { const testMessages = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); - await sender.send(testMessages); - const batch = await receiver.receiveBatch(1); + await sender.sendMessages(testMessages); + const batch = await receiver.receiveMessages(1); const msgs = batch; should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); @@ -713,12 +713,12 @@ describe("receive and delete", () => { } async function receiveDeferredMessage(): Promise { - const deferredMsgs = await receiver.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { + const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber); + if (!deferredMsg) { throw `No message received for sequence number ${sequenceNumber}`; } - should.equal(deferredMsgs!.deliveryCount, 1, "DeliveryCount is different than expected"); + should.equal(deferredMsg!.deliveryCount, 1, "DeliveryCount is different than expected"); await testPeekMsgsLength(receiver, 0); } diff --git a/sdk/servicebus/service-bus/test/renewLock.spec.ts b/sdk/servicebus/service-bus/test/renewLock.spec.ts index d4c7e2083330..3c96fd65fc42 100644 --- a/sdk/servicebus/service-bus/test/renewLock.spec.ts +++ b/sdk/servicebus/service-bus/test/renewLock.spec.ts @@ -271,9 +271,9 @@ describe("renew lock", () => { receiver: Receiver ): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); - const msgs = await receiver.receiveBatch(1); + const msgs = await receiver.receiveMessages(1); // Compute expected initial lock expiry time const expectedLockExpiryTimeUtc = new Date(); @@ -319,9 +319,9 @@ describe("renew lock", () => { receiver: Receiver ): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); - const msgs = await receiver.receiveBatch(1); + const msgs = await receiver.receiveMessages(1); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Expected message length does not match"); @@ -340,7 +340,7 @@ describe("renew lock", () => { should.equal(errorWasThrown, true, "Error thrown flag must be true"); // Clean up any left over messages - const unprocessedMsgsBatch = await receiver.receiveBatch(1); + const unprocessedMsgsBatch = await receiver.receiveMessages(1); await unprocessedMsgsBatch[0].complete(); } @@ -353,7 +353,7 @@ describe("renew lock", () => { ): Promise { let numOfMessagesReceived = 0; const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); async function processMessage(brokeredMessage: ReceivedMessageWithLock): Promise { if (numOfMessagesReceived < 1) { @@ -431,7 +431,7 @@ describe("renew lock", () => { ): Promise { let numOfMessagesReceived = 0; const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); async function processMessage(brokeredMessage: ReceivedMessageWithLock): Promise { if (numOfMessagesReceived < 1) { @@ -484,7 +484,7 @@ describe("renew lock", () => { await serviceBusClient.test.createTestEntities(entityType) ); - const unprocessedMsgsBatch = await receiver.receiveBatch(1); + const unprocessedMsgsBatch = await receiver.receiveMessages(1); if (unprocessedMsgsBatch.length) { await unprocessedMsgsBatch[0].complete(); } diff --git a/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts b/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts index 9c26595fe610..29f12da4579e 100644 --- a/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts @@ -339,9 +339,9 @@ describe("renew lock sessions", () => { ): Promise { const testMessage = getTestMessage(); testMessage.body = `testBatchReceiverManualLockRenewalHappyCase-${Date.now().toString()}`; - await sender.send(testMessage); + await sender.sendMessages(testMessage); - const msgs = await receiver.receiveBatch(1); + const msgs = await receiver.receiveMessages(1); // Compute expected initial lock expiry time const expectedLockExpiryTimeUtc = new Date(); @@ -387,9 +387,9 @@ describe("renew lock sessions", () => { ): Promise { const testMessage = getTestMessage(); testMessage.body = `testBatchReceiverManualLockRenewalErrorOnLockExpiry-${Date.now().toString()}`; - await sender.send(testMessage); + await sender.sendMessages(testMessage); - const msgs = await receiver.receiveBatch(1); + const msgs = await receiver.receiveMessages(1); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Expected message length does not match"); @@ -412,7 +412,7 @@ describe("renew lock sessions", () => { const entityNames = serviceBusClient.test.getTestEntities(entityType); receiver = await serviceBusClient.test.getSessionPeekLockReceiver(entityNames); - const unprocessedMsgsBatch = await receiver.receiveBatch(1); + const unprocessedMsgsBatch = await receiver.receiveMessages(1); should.equal(unprocessedMsgsBatch[0].deliveryCount, 1, "Unexpected deliveryCount"); await unprocessedMsgsBatch[0].complete(); } @@ -427,7 +427,7 @@ describe("renew lock sessions", () => { let numOfMessagesReceived = 0; const testMessage = getTestMessage(); testMessage.body = `testStreamingReceiverManualLockRenewalHappyCase-${Date.now().toString()}`; - await sender.send(testMessage); + await sender.sendMessages(testMessage); async function processMessage(brokeredMessage: ReceivedMessageWithLock) { if (numOfMessagesReceived < 1) { @@ -504,7 +504,7 @@ describe("renew lock sessions", () => { let numOfMessagesReceived = 0; const testMessage = getTestMessage(); testMessage.body = `testAutoLockRenewalConfigBehavior-${Date.now().toString()}`; - await sender.send(testMessage); + await sender.sendMessages(testMessage); let sessionLockLostErrorThrown = false; const messagesReceived: ReceivedMessageWithLock[] = []; diff --git a/sdk/servicebus/service-bus/test/retries.spec.ts b/sdk/servicebus/service-bus/test/retries.spec.ts index 16485c4316a8..3c8194848f59 100644 --- a/sdk/servicebus/service-bus/test/retries.spec.ts +++ b/sdk/servicebus/service-bus/test/retries.spec.ts @@ -90,13 +90,6 @@ describe("Retries - ManagementClient", () => { await afterEachTest(); }); - it("Unpartitioned Queue: scheduleMessage", async function(): Promise { - await beforeEachTest(TestClientType.UnpartitionedQueue); - await mockManagementClientAndVerifyRetries(async () => { - await sender.scheduleMessage(new Date(), TestMessage.getSample()); - }); - }); - it("Unpartitioned Queue: scheduleMessages", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueue); await mockManagementClientAndVerifyRetries(async () => { @@ -104,13 +97,6 @@ describe("Retries - ManagementClient", () => { }); }); - it("Unpartitioned Queue with Sessions: scheduleMessage", async function(): Promise { - await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); - await mockManagementClientAndVerifyRetries(async () => { - await sender.cancelScheduledMessage(new Long(0)); - }); - }); - it("Unpartitioned Queue with Sessions: scheduleMessages", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); await mockManagementClientAndVerifyRetries(async () => { @@ -131,7 +117,7 @@ describe("Retries - ManagementClient", () => { it("Unpartitioned Queue: receiveDeferredMessage", async function(): Promise { await mockManagementClientAndVerifyRetries(async () => { - await receiver.receiveDeferredMessage(new Long(0)); + await receiver.receiveDeferredMessages(new Long(0)); }); }); @@ -175,7 +161,7 @@ describe("Retries - ManagementClient", () => { void > { await mockManagementClientAndVerifyRetries(async () => { - await sessionReceiver.receiveDeferredMessage(new Long(0)); + await sessionReceiver.receiveDeferredMessages(new Long(0)); }); }); @@ -294,7 +280,7 @@ describe("Retries - MessageSender", () => { it("Unpartitioned Queue: send", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueue); await mockInitAndVerifyRetries(async () => { - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); }); }); @@ -312,14 +298,14 @@ describe("Retries - MessageSender", () => { batch.tryAdd({ body: "hello" }); - await sender.send(batch); + await sender.sendMessages(batch); }); }); it("Unpartitioned Queue with Sessions: send", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueue); await mockInitAndVerifyRetries(async () => { - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); }); }); @@ -337,7 +323,7 @@ describe("Retries - MessageSender", () => { batch.tryAdd({ body: "hello" }); - await sender.send(batch); + await sender.sendMessages(batch); }); }); }); @@ -409,14 +395,14 @@ describe("Retries - Receive methods", () => { it("Unpartitioned Queue: receiveBatch", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueue); await mockReceiveAndVerifyRetries(async () => { - await receiver.receiveBatch(1); + await receiver.receiveMessages(1); }); }); it("Unpartitioned Queue with Sessions: receiveBatch", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueueWithSessions); await mockReceiveAndVerifyRetries(async () => { - await receiver.receiveBatch(1); + await receiver.receiveMessages(1); }); }); diff --git a/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts b/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts index bdb4553df806..8ad86cc5797f 100644 --- a/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts +++ b/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts @@ -52,8 +52,8 @@ describe("send scheduled messages", () => { async function testSimpleSend(useSessions: boolean, usePartitions: boolean): Promise { const testMessage = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); - await sender.send(testMessage); - const msgs = await receiver.receiveBatch(1); + await sender.sendMessages(testMessage); + const msgs = await receiver.receiveMessages(1); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Unexpected number of messages"); @@ -120,8 +120,8 @@ describe("send scheduled messages", () => { testMessages.push(useSessions ? TestMessage.getSessionSample() : TestMessage.getSample()); testMessages.push(useSessions ? TestMessage.getSessionSample() : TestMessage.getSample()); - await sender.send(testMessages); - const msgs = await receiver.receiveBatch(2); + await sender.sendMessages(testMessages); + const msgs = await receiver.receiveMessages(2); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 2, "Unexpected number of messages"); @@ -189,25 +189,22 @@ describe("send scheduled messages", () => { /** * Schedules a test message message to be sent at a later time, waits and then receives it * @param useSessions Set to true if using session enabled queues or subscriptions - * @param useScheduleMessages Boolean to indicate whether to use `scheduleMessage` or - * `scheduleMessages` to ensure both get code coverage + * @param passSequenceNumberInArray Boolean to indicate whether to pass the sequence number + * as is or in an array to ensure both get code coverage */ async function testScheduleMessage( useSessions: boolean, - useScheduleMessages: boolean + passSequenceNumberInArray: boolean ): Promise { const testMessage = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); const scheduleTime = new Date(Date.now() + 10000); // 10 seconds from - // Randomly choose scheduleMessage/scheduleMessages as the latter is expected to convert single - // input to array and then use it - if (useScheduleMessages) { - await sender.scheduleMessages(scheduleTime, testMessage as any); - } else { - await sender.scheduleMessage(scheduleTime, testMessage); - } + await sender.scheduleMessages( + scheduleTime, + passSequenceNumberInArray ? [testMessage] : testMessage + ); - const msgs = await receiver.receiveBatch(1); + const msgs = await receiver.receiveMessages(1); const msgEnqueueTime = msgs[0].enqueuedTimeUtc ? msgs[0].enqueuedTimeUtc.valueOf() : 0; should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); @@ -309,7 +306,7 @@ describe("send scheduled messages", () => { const scheduleTime = new Date(Date.now() + 10000); // 10 seconds from now await sender.scheduleMessages(scheduleTime, testMessages); - const msgs = await receiver.receiveBatch(2); + const msgs = await receiver.receiveMessages(2); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 2, "Unexpected number of messages"); @@ -401,11 +398,11 @@ describe("send scheduled messages", () => { async function testCancelScheduleMessage(useSessions?: boolean): Promise { const testMessage = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); const scheduleTime = new Date(Date.now() + 30000); // 30 seconds from now as anything less gives inconsistent results for cancelling - const sequenceNumber = await sender.scheduleMessage(scheduleTime, testMessage); + const [sequenceNumber] = await sender.scheduleMessages(scheduleTime, testMessage); await delay(2000); - await sender.cancelScheduledMessage(sequenceNumber); + await sender.cancelScheduledMessages(sequenceNumber); // Wait until we are sure we have passed the schedule time await delay(30000); @@ -467,11 +464,13 @@ describe("send scheduled messages", () => { }); async function testCancelScheduleMessages(useSessions?: boolean): Promise { - const testMessage = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); + const getTestMessage = useSessions ? TestMessage.getSessionSample : TestMessage.getSample; const scheduleTime = new Date(Date.now() + 30000); // 30 seconds from now as anything less gives inconsistent results for cancelling - const sequenceNumber1 = await sender.scheduleMessage(scheduleTime, testMessage); - const sequenceNumber2 = await sender.scheduleMessage(scheduleTime, testMessage); + const [sequenceNumber1, sequenceNumber2] = await sender.scheduleMessages(scheduleTime, [ + getTestMessage(), + getTestMessage() + ]); await delay(2000); @@ -544,12 +543,13 @@ describe("send scheduled messages", () => { { body: "Hello, again!" }, { body: "Hello, again and again!!" } ]; - let sequenceNumbers = await Promise.all([ + let [result1, result2, result3] = await Promise.all([ // Schedule messages in parallel - sender.scheduleMessage(date, messages[0]), - sender.scheduleMessage(date, messages[1]), - sender.scheduleMessage(date, messages[2]) + sender.scheduleMessages(date, messages[0]), + sender.scheduleMessages(date, messages[1]), + sender.scheduleMessages(date, messages[2]) ]); + const sequenceNumbers = [result1[0], result2[0], result3[0]] compareSequenceNumbers(sequenceNumbers[0], sequenceNumbers[1]); compareSequenceNumbers(sequenceNumbers[0], sequenceNumbers[2]); compareSequenceNumbers(sequenceNumbers[1], sequenceNumbers[2]); @@ -562,7 +562,7 @@ describe("send scheduled messages", () => { ); } - const receivedMsgs = await receiver.receiveBatch(3); + const receivedMsgs = await receiver.receiveMessages(3); should.equal(receivedMsgs.length, 3, "Unexpected number of messages"); for (const seqNum of sequenceNumbers) { const msgWithSeqNum = receivedMsgs.find( @@ -676,9 +676,9 @@ describe("send scheduled messages", () => { ]; testInputs.forEach(function(testInput: any): void { - it("Send() throws if " + testInput.title, async function(): Promise { + it("SendMessages() throws if " + testInput.title, async function(): Promise { let actualErrorMsg = ""; - await sender.send(testInput.message).catch((err) => { + await sender.sendMessages(testInput.message).catch((err) => { actualErrorMsg = err.message; }); should.equal( @@ -719,10 +719,10 @@ describe("send scheduled messages", () => { // } // ); - it("ScheduleMessage() throws if " + testInput.title, async function(): Promise { + it("ScheduleMessages() throws if " + testInput.title, async function(): Promise { let actualErrorMsg = ""; let actualErr; - await sender.scheduleMessage(new Date(), testInput.message).catch((err) => { + await sender.scheduleMessages(new Date(), testInput.message).catch((err) => { actualErr = err; actualErrorMsg = err.message; }); @@ -735,7 +735,7 @@ describe("send scheduled messages", () => { receiver: Receiver, expectedReceivedMsgsLength: number ): Promise { - const receivedMsgs = await receiver.receiveBatch(expectedReceivedMsgsLength + 1, { + const receivedMsgs = await receiver.receiveMessages(expectedReceivedMsgsLength + 1, { maxWaitTimeInMs: 5000 }); @@ -747,20 +747,6 @@ describe("send scheduled messages", () => { } describe("Cancel operations on the sender", function(): void { - it("Abort scheduleMessage request on the sender", async function(): Promise { - await beforeEachTest(TestClientType.PartitionedQueue); - const controller = new AbortController(); - setTimeout(() => controller.abort(), 1); - try { - await sender.scheduleMessage(new Date(), TestMessage.getSample(), { - abortSignal: controller.signal - }); - throw new Error(`Test failure`); - } catch (err) { - err.message.should.equal("The scheduleMessage operation has been cancelled by the user."); - } - }); - it("Abort scheduleMessages request on the sender", async function(): Promise { await beforeEachTest(TestClientType.PartitionedQueue); const controller = new AbortController(); @@ -775,20 +761,6 @@ describe("send scheduled messages", () => { } }); - it("Abort cancelScheduledMessage request on the sender", async function(): Promise { - await beforeEachTest(TestClientType.PartitionedQueue); - const controller = new AbortController(); - setTimeout(() => controller.abort(), 1); - try { - await sender.cancelScheduledMessage(Long.ZERO, { abortSignal: controller.signal }); - throw new Error(`Test failure`); - } catch (err) { - err.message.should.equal( - "The cancelScheduledMessage operation has been cancelled by the user." - ); - } - }); - it("Abort cancelScheduledMessages request on the sender", async function(): Promise { await beforeEachTest(TestClientType.PartitionedQueue); const controller = new AbortController(); diff --git a/sdk/servicebus/service-bus/test/sendBatch.spec.ts b/sdk/servicebus/service-bus/test/sendBatch.spec.ts index cc5aa7d152ef..26f6464df95c 100644 --- a/sdk/servicebus/service-bus/test/sendBatch.spec.ts +++ b/sdk/servicebus/service-bus/test/sendBatch.spec.ts @@ -76,7 +76,7 @@ describe("Send Batch", () => { sentMessages.push(messageToSend); } } - await sender.send(batchMessage); + await sender.sendMessages(batchMessage); // receive all the messages in receive and delete mode await serviceBusClient.test.verifyAndDeleteAllSentMessages( entityNames, @@ -161,7 +161,7 @@ describe("Send Batch", () => { sentMessages.push(messageToSend); } } - await sender.send(batchMessage); + await sender.sendMessages(batchMessage); // receive all the messages in receive and delete mode await serviceBusClient.test.verifyAndDeleteAllSentMessages( entityNames, @@ -237,7 +237,7 @@ describe("Send Batch", () => { sentMessages.push(messageToSend); } } - await sender.send(batchMessage); + await sender.sendMessages(batchMessage); // receive all the messages in receive and delete mode await serviceBusClient.test.verifyAndDeleteAllSentMessages( entityNames, @@ -316,7 +316,7 @@ describe("Send Batch", () => { sentMessages.push(messageToSend); } } - await sender.send(batchMessage); + await sender.sendMessages(batchMessage); // receive all the messages in receive and delete mode await serviceBusClient.test.verifyAndDeleteAllSentMessages( entityNames, @@ -414,7 +414,7 @@ describe("Send Batch", () => { sentMessages.push(messageToSend); } } - await sender.send(batchMessage); + await sender.sendMessages(batchMessage); // receive all the messages in receive and delete mode await serviceBusClient.test.verifyAndDeleteAllSentMessages( entityNames, @@ -523,7 +523,7 @@ describe("Send Batch", () => { false, "tryAdd should have failed for the fourth message" ); - await sender.send(batchMessage); + await sender.sendMessages(batchMessage); // receive all the messages in receive and delete mode await serviceBusClient.test.verifyAndDeleteAllSentMessages(entityNames, useSessions, [ messagesToSend[0] @@ -642,7 +642,7 @@ describe("Send Batch", () => { > { await beforeEachTest(TestClientType.PartitionedQueue); try { - await sender.send( + await sender.sendMessages( [{ body: "ignored since anything will be bigger than the batch size I passed" }], { // this isn't a documented option for send(batch) but we do pass it through to the underlying @@ -672,7 +672,7 @@ describe("Send Batch", () => { } } - await sender.send(batch); + await sender.sendMessages(batch); await serviceBusClient.test.verifyAndDeleteAllSentMessages(entityNames, false, messagesToSend); }); }); diff --git a/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts index 6c7213208117..6780e9591b96 100644 --- a/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts @@ -78,10 +78,10 @@ describe("Random scheme in the endpoint from connection string", function(): voi }); async function sendReceiveMsg(testMessages: ServiceBusMessage): Promise { - await sender.send(testMessages); + await sender.sendMessages(testMessages); await testPeekMsgsLength(receiver, 1); - const msgs = await receiver.receiveBatch(1); + const msgs = await receiver.receiveMessages(1); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Unexpected number of messages"); @@ -145,7 +145,7 @@ describe("Errors with non existing Namespace", function(): void { void > { const receiver = sbClient.createReceiver("some-queue", "peekLock"); - await receiver.receiveBatch(10).catch(testError); + await receiver.receiveMessages(10).catch(testError); should.equal(errorWasThrown, true, "Error thrown flag must be true"); }); @@ -207,7 +207,7 @@ describe("Errors with non existing Queue/Topic/Subscription", async function(): void > { const receiver = sbClient.createReceiver("some-name", "peekLock"); - await receiver.receiveBatch(1).catch((err) => testError(err, "some-name")); + await receiver.receiveMessages(1).catch((err) => testError(err, "some-name")); should.equal(errorWasThrown, true, "Error thrown flag must be true"); }); @@ -221,7 +221,7 @@ describe("Errors with non existing Queue/Topic/Subscription", async function(): "peekLock" ); await receiver - .receiveBatch(1) + .receiveMessages(1) .catch((err) => testError(err, "some-topic-name/Subscriptions/some-subscription-name")); should.equal(errorWasThrown, true, "Error thrown flag must be true"); @@ -363,8 +363,8 @@ describe("Test ServiceBusClient creation", function(): void { const sender = sbClient.createSender(entities.queue!); const receiver = sbClient.createReceiver(entities.queue!, "peekLock"); const testMessages = TestMessage.getSample(); - await sender.send(testMessages); - const msgs = await receiver.receiveBatch(1); + await sender.sendMessages(testMessages); + const msgs = await receiver.receiveMessages(1); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs[0].body, testMessages.body, "MessageBody is different than expected"); @@ -400,8 +400,8 @@ describe("Errors after close()", function(): void { const testMessage = entityName.usesSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); - await sender.send(testMessage); - const receivedMsgs = await receiver.receiveBatch(1, { maxWaitTimeInMs: 5000 }); + await sender.sendMessages(testMessage); + const receivedMsgs = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); should.equal(receivedMsgs.length, 1, "Unexpected number of messages received"); receivedMessage = receivedMsgs[0]; @@ -475,10 +475,10 @@ describe("Errors after close()", function(): void { const testMessage = TestMessage.getSample(); let errorSend: string = ""; - await sender.send(testMessage).catch((err) => { + await sender.sendMessages(testMessage).catch((err) => { errorSend = err.message; }); - should.equal(errorSend, expectedErrorMsg, "Expected error not thrown for send()"); + should.equal(errorSend, expectedErrorMsg, "Expected error not thrown for sendMessages()"); let errorCreateBatch: string = ""; await sender.createBatch().catch((err) => { @@ -487,21 +487,11 @@ describe("Errors after close()", function(): void { should.equal(errorCreateBatch, expectedErrorMsg, "Expected error not thrown for createBatch()"); let errorSendBatch: string = ""; - await sender.send(1 as any).catch((err) => { + await sender.sendMessages(1 as any).catch((err) => { errorSendBatch = err.message; }); should.equal(errorSendBatch, expectedErrorMsg, "Expected error not thrown for sendBatch()"); - let errorScheduleMsg: string = ""; - await sender.scheduleMessage(new Date(Date.now() + 30000), testMessage).catch((err) => { - errorScheduleMsg = err.message; - }); - should.equal( - errorScheduleMsg, - expectedErrorMsg, - "Expected error not thrown for scheduleMessage()" - ); - let errorScheduleMsgs: string = ""; await sender.scheduleMessages(new Date(Date.now() + 30000), [testMessage]).catch((err) => { errorScheduleMsgs = err.message; @@ -512,16 +502,6 @@ describe("Errors after close()", function(): void { "Expected error not thrown for scheduleMessages()" ); - let errorCancelMsg: string = ""; - await sender.cancelScheduledMessage(Long.ZERO).catch((err) => { - errorCancelMsg = err.message; - }); - should.equal( - errorCancelMsg, - expectedErrorMsg, - "Expected error not thrown for cancelScheduledMessage()" - ); - let errorCancelMsgs: string = ""; await sender.cancelScheduledMessages([Long.ZERO]).catch((err) => { errorCancelMsgs = err.message; @@ -553,7 +533,7 @@ describe("Errors after close()", function(): void { should.equal(receiver.isClosed, true, "Receiver is not marked as closed."); let errorReceiveBatch: string = ""; - await receiver.receiveBatch(1, { maxWaitTimeInMs: 1000 }).catch((err) => { + await receiver.receiveMessages(1, { maxWaitTimeInMs: 1000 }).catch((err) => { errorReceiveBatch = err.message; }); should.equal( @@ -579,18 +559,8 @@ describe("Errors after close()", function(): void { "Expected error not thrown for registerMessageHandler()" ); - let errorDeferredMsg: string = ""; - await receiver.receiveDeferredMessage(Long.ZERO).catch((err) => { - errorDeferredMsg = err.message; - }); - should.equal( - errorDeferredMsg, - expectedErrorMsg, - "Expected error not thrown for receiveDeferredMessage()" - ); - let errorDeferredMsgs: string = ""; - await receiver.receiveDeferredMessage(Long.ZERO).catch((err) => { + await receiver.receiveDeferredMessages(Long.ZERO).catch((err) => { errorDeferredMsgs = err.message; }); should.equal( diff --git a/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts b/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts index 46d8b72a9d92..4cf8f91ea620 100644 --- a/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts +++ b/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts @@ -70,7 +70,7 @@ describe("sessions tests - requires completely clean entity for each test", () useSessionId: boolean ): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const entityNames = serviceBusClient.test.getTestEntities(testClientType); @@ -95,7 +95,7 @@ describe("sessions tests - requires completely clean entity for each test", () "SessionId is different than expected" ); - const msgs = await receiver.receiveBatch(1); + const msgs = await receiver.receiveMessages(1); should.equal(msgs.length, 1, "Unexpected number of messages received"); should.equal(msgs[0].body, testMessage.body, "MessageBody is different than expected"); should.equal( @@ -163,13 +163,13 @@ describe("sessions tests - requires completely clean entity for each test", () ]; async function testComplete_batching(testClientType: TestClientType): Promise { - await sender.send(testMessagesWithDifferentSessionIds[0]); - await sender.send(testMessagesWithDifferentSessionIds[1]); + await sender.sendMessages(testMessagesWithDifferentSessionIds[0]); + await sender.sendMessages(testMessagesWithDifferentSessionIds[1]); const entityNames = serviceBusClient.test.getTestEntities(testClientType); receiver = await serviceBusClient.test.getSessionPeekLockReceiver(entityNames); - let msgs = await receiver.receiveBatch(2); + let msgs = await receiver.receiveMessages(2); should.equal(msgs.length, 1, "Unexpected number of messages received"); should.equal(receiver.sessionId, msgs[0].sessionId, "Unexpected sessionId in receiver"); @@ -189,7 +189,7 @@ describe("sessions tests - requires completely clean entity for each test", () // get the next available session ID rather than specifying one receiver = await serviceBusClient.test.getSessionPeekLockReceiver(entityNames); - msgs = await receiver.receiveBatch(2); + msgs = await receiver.receiveMessages(2); should.equal(msgs.length, 1, "Unexpected number of messages received"); should.equal(receiver.sessionId, msgs[0].sessionId, "Unexpected sessionId in receiver"); @@ -257,8 +257,8 @@ describe("sessions tests - requires completely clean entity for each test", () ]; async function testComplete_batching(testClientType: TestClientType): Promise { - await sender.send(testMessagesWithDifferentSessionIds[0]); - await sender.send(testMessagesWithDifferentSessionIds[1]); + await sender.sendMessages(testMessagesWithDifferentSessionIds[0]); + await sender.sendMessages(testMessagesWithDifferentSessionIds[1]); const entityNames = serviceBusClient.test.getTestEntities(testClientType); @@ -267,7 +267,7 @@ describe("sessions tests - requires completely clean entity for each test", () sessionId: "" }); - const msgs = await receiver.receiveBatch(2); + const msgs = await receiver.receiveMessages(2); should.equal(msgs.length, 1, "Unexpected number of messages received"); diff --git a/sdk/servicebus/service-bus/test/sessionsTests.spec.ts b/sdk/servicebus/service-bus/test/sessionsTests.spec.ts index 70b6518ac1ed..538edbfebddd 100644 --- a/sdk/servicebus/service-bus/test/sessionsTests.spec.ts +++ b/sdk/servicebus/service-bus/test/sessionsTests.spec.ts @@ -79,9 +79,9 @@ describe("session tests", () => { async function test_batching(testClientType: TestClientType): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); - let msgs = await receiver.receiveBatch(1, { maxWaitTimeInMs: 10000 }); + let msgs = await receiver.receiveMessages(1, { maxWaitTimeInMs: 10000 }); should.equal(msgs.length, 0, "Unexpected number of messages received"); await receiver.close(); @@ -91,7 +91,7 @@ describe("session tests", () => { // get the next available session ID rather than specifying one receiver = await serviceBusClient.test.getSessionPeekLockReceiver(entityNames); - msgs = await receiver.receiveBatch(1); + msgs = await receiver.receiveMessages(1); should.equal(msgs.length, 1, "Unexpected number of messages received"); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs[0].body, testMessage.body, "MessageBody is different than expected"); @@ -140,7 +140,7 @@ describe("session tests", () => { async function test_streaming(testClientType: TestClientType): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); let receivedMsgs: ReceivedMessage[] = []; receiver.subscribe({ @@ -226,9 +226,9 @@ describe("session tests", () => { async function testGetSetState(testClientType: TestClientType): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); - let msgs = await receiver.receiveBatch(2); + let msgs = await receiver.receiveMessages(2); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Unexpected number of messages received"); @@ -257,7 +257,7 @@ describe("session tests", () => { // get the next available session ID rather than specifying one receiver = await serviceBusClient.test.getSessionPeekLockReceiver(entityNames); - msgs = await receiver.receiveBatch(2); + msgs = await receiver.receiveMessages(2); should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); should.equal(msgs.length, 1, "Unexpected number of messages received"); @@ -347,22 +347,6 @@ describe("session tests", () => { } }); - it("Abort receiveDeferredMessage request on the session receiver", async function(): Promise< - void - > { - await beforeEachTest(TestClientType.PartitionedQueueWithSessions, TestMessage.sessionId); - const controller = new AbortController(); - setTimeout(() => controller.abort(), 1); - try { - await receiver.receiveDeferredMessage(Long.ZERO, { abortSignal: controller.signal }); - throw new Error(`Test failure`); - } catch (err) { - err.message.should.equal( - "The receiveDeferredMessage operation has been cancelled by the user." - ); - } - }); - it("Abort receiveDeferredMessages request on the session receiver", async function(): Promise< void > { @@ -407,7 +391,7 @@ describe.skip("SessionReceiver - disconnects", function(): void { }); const sender = serviceBusClient.createSender(entityName.queue!); // Send a message so we can be sure when the receiver is open and active. - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedErrors: any[] = []; let settledMessageCount = 0; @@ -468,7 +452,7 @@ describe.skip("SessionReceiver - disconnects", function(): void { // Otherwise, it will get into a bad internal state with uncaught exceptions. await delay(2000); // send a second message to trigger the message handler again. - await sender.send(TestMessage.getSessionSample()); + await sender.sendMessages(TestMessage.getSessionSample()); console.log("Waiting for 2nd message"); // wait for the 2nd message to be received. await receiverSecondMessage; diff --git a/sdk/servicebus/service-bus/test/smoketest.spec.ts b/sdk/servicebus/service-bus/test/smoketest.spec.ts index 933d65db131f..2bb53d005db9 100644 --- a/sdk/servicebus/service-bus/test/smoketest.spec.ts +++ b/sdk/servicebus/service-bus/test/smoketest.spec.ts @@ -74,7 +74,7 @@ describe("Sample scenarios for track 2", () => { const receivedBodies: string[] = []; - for (const message of await receiver.receiveBatch(1, { maxWaitTimeInMs: 5000 })) { + for (const message of await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 })) { receivedBodies.push(message.body); } @@ -438,17 +438,17 @@ describe("Sample scenarios for track 2", () => { switch (method) { case "single": { - await sender.send(message); + await sender.sendMessages(message); break; } case "array": { - await sender.send([message]); + await sender.sendMessages([message]); break; } case "batch": { const batch = await sender.createBatch(); assert.isTrue(batch.tryAdd(message)); - await sender.send(batch); + await sender.sendMessages(batch); break; } } diff --git a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts index 0639f05fe9b7..5b57a83a1c79 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts @@ -83,7 +83,7 @@ describe("Streaming", () => { async function testAutoComplete(): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessage[] = []; receiver.subscribe({ @@ -121,7 +121,7 @@ describe("Streaming", () => { } async function testAutoCompleteWithSenderAndReceiver(): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessage[] = []; receiver.subscribe({ @@ -185,7 +185,7 @@ describe("Streaming", () => { async function testManualComplete(): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessageWithLock[] = []; receiver.subscribe( @@ -295,7 +295,7 @@ describe("Streaming", () => { async function testComplete(autoComplete: boolean): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessageWithLock[] = []; receiver.subscribe( @@ -378,7 +378,7 @@ describe("Streaming", () => { async function testMultipleAbandons(): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); let checkDeliveryCount = 0; @@ -407,7 +407,7 @@ describe("Streaming", () => { await testPeekMsgsLength(receiver, 0); // No messages in the queue - const deadLetterMsgs = await deadLetterReceiver.receiveBatch(1); + const deadLetterMsgs = await deadLetterReceiver.receiveMessages(1); should.equal(Array.isArray(deadLetterMsgs), true, "`ReceivedMessages` is not an array"); should.equal(deadLetterMsgs.length, 1, "Unexpected number of messages"); should.equal( @@ -462,7 +462,7 @@ describe("Streaming", () => { async function testDefer(autoComplete: boolean): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); let sequenceNum: any = 0; receiver.subscribe( @@ -568,7 +568,7 @@ describe("Streaming", () => { async function testDeadletter(autoComplete: boolean): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessage[] = []; @@ -590,7 +590,7 @@ describe("Streaming", () => { await testPeekMsgsLength(receiver, 0); - const deadLetterMsgs = await deadLetterReceiver.receiveBatch(1); + const deadLetterMsgs = await deadLetterReceiver.receiveMessages(1); should.equal(Array.isArray(deadLetterMsgs), true, "`ReceivedMessages` is not an array"); should.equal(deadLetterMsgs.length, 1, "Unexpected number of messages"); should.equal( @@ -695,7 +695,7 @@ describe("Streaming", () => { errorMessage = ""; try { - await receiver.receiveBatch(1); + await receiver.receiveMessages(1); } catch (err) { errorMessage = err && err.message; } @@ -730,7 +730,7 @@ describe("Streaming", () => { async function testSettlement(operation: DispositionType): Promise { const testMessage = TestMessage.getSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessageWithLock[] = []; receiver.subscribe({ async processMessage(msg: ReceivedMessageWithLock) { @@ -828,7 +828,7 @@ describe("Streaming", () => { }); async function testUserError(): Promise { - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); const errorMessage = "Will we see this error message?"; const receivedMsgs: ReceivedMessageWithLock[] = []; @@ -954,7 +954,7 @@ describe("Streaming", () => { testMessages.forEach((message) => { batchMessageToSend.tryAdd(message); }); - await sender.send(batchMessageToSend); + await sender.sendMessages(batchMessageToSend); const settledMsgs: ReceivedMessage[] = []; const receivedMsgs: ReceivedMessage[] = []; @@ -1073,7 +1073,7 @@ describe("Streaming", () => { messages.push(message); batch.tryAdd(message); } - await sender.send(batch); + await sender.sendMessages(batch); const receivedMsgs: ReceivedMessageWithLock[] = []; @@ -1164,7 +1164,7 @@ describe("Streaming - onDetached", function(): void { // Create the sender and receiver. await beforeEachTest(TestClientType.UnpartitionedQueue, "receiveAndDelete"); // Send a message so we can be sure when the receiver is open and active. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); const receivedErrors: any[] = []; let receiverIsActiveResolver: Function; @@ -1199,7 +1199,7 @@ describe("Streaming - onDetached", function(): void { // Create the sender and receiver. await beforeEachTest(TestClientType.UnpartitionedQueue, "receiveAndDelete"); // Send a message so we can be sure when the receiver is open and active. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); const receivedErrors: any[] = []; let receiverIsActiveResolver: Function; @@ -1234,7 +1234,7 @@ describe("Streaming - onDetached", function(): void { // Create the sender and receiver. await beforeEachTest(TestClientType.UnpartitionedQueue, "receiveAndDelete"); // Send a message so we can be sure when the receiver is open and active. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); const receivedErrors: any[] = []; let receiverIsActiveResolver: Function; @@ -1272,7 +1272,7 @@ describe("Streaming - onDetached", function(): void { // Create the sender and receiver. await beforeEachTest(TestClientType.UnpartitionedQueue, "receiveAndDelete"); // Send a message so we can be sure when the receiver is open and active. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); const receivedErrors: any[] = []; let receiverIsActiveResolver: Function; @@ -1349,7 +1349,7 @@ describe("Streaming - disconnects", function(): void { // Create the sender and receiver. await beforeEachTest(TestClientType.UnpartitionedQueue); // Send a message so we can be sure when the receiver is open and active. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); const receivedErrors: any[] = []; let settledMessageCount = 0; @@ -1407,7 +1407,7 @@ describe("Streaming - disconnects", function(): void { // Otherwise, it will get into a bad internal state with uncaught exceptions. await delay(2000); // send a second message to trigger the message handler again. - await sender.send(TestMessage.getSample()); + await sender.sendMessages(TestMessage.getSample()); // wait for the 2nd message to be received. await receiverSecondMessage; diff --git a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts index 2dc5a9f52016..a5968187e72e 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts @@ -106,7 +106,7 @@ describe("Streaming with sessions", () => { async function testAutoComplete(): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessage[] = []; receiver.subscribe({ @@ -170,7 +170,7 @@ describe("Streaming with sessions", () => { async function testManualComplete(): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessageWithLock[] = []; receiver.subscribe( { @@ -237,7 +237,7 @@ describe("Streaming with sessions", () => { async function testComplete(autoComplete: boolean): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessageWithLock[] = []; receiver.subscribe( @@ -335,7 +335,7 @@ describe("Streaming with sessions", () => { autoComplete: boolean ): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); let abandonFlag = 0; receiver.subscribe( @@ -365,7 +365,7 @@ describe("Streaming with sessions", () => { await createReceiverForTests(testClientType); - const receivedMsgs = await receiver.receiveBatch(1); + const receivedMsgs = await receiver.receiveMessages(1); should.equal(receivedMsgs.length, 1, "Unexpected number of messages"); should.equal( receivedMsgs[0].messageId, @@ -432,7 +432,7 @@ describe("Streaming with sessions", () => { async function testDefer(autoComplete: boolean): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); let sequenceNum: any = 0; receiver.subscribe( @@ -455,7 +455,7 @@ describe("Streaming with sessions", () => { should.equal(unexpectedError, undefined, unexpectedError && unexpectedError.message); - const deferredMsg = await receiver.receiveDeferredMessage(sequenceNum); + const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNum); if (!deferredMsg) { throw "No message received for sequence number"; } @@ -535,7 +535,7 @@ describe("Streaming with sessions", () => { async function testDeadletter(autoComplete: boolean): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); let msgCount = 0; receiver.subscribe( @@ -556,7 +556,7 @@ describe("Streaming with sessions", () => { should.equal(msgCount, 1, "Unexpected number of messages"); await testPeekMsgsLength(receiver, 0); - const deadLetterMsgs = await deadLetterReceiver.receiveBatch(1); + const deadLetterMsgs = await deadLetterReceiver.receiveMessages(1); should.equal(Array.isArray(deadLetterMsgs), true, "`ReceivedMessages` is not an array"); should.equal(deadLetterMsgs.length, 1, "Unexpected number of messages"); should.equal( @@ -663,7 +663,7 @@ describe("Streaming with sessions", () => { errorMessage = ""; try { - await receiver.receiveBatch(1); + await receiver.receiveMessages(1); } catch (err) { errorMessage = err && err.message; } @@ -698,7 +698,7 @@ describe("Streaming with sessions", () => { async function testSettlement(operation: DispositionType): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const receivedMsgs: ReceivedMessageWithLock[] = []; receiver.subscribe({ @@ -811,7 +811,7 @@ describe("Streaming with sessions", () => { async function testUserError(): Promise { const testMessage = TestMessage.getSessionSample(); - await sender.send(testMessage); + await sender.sendMessages(testMessage); const errorMessage = "Will we see this error message?"; const receivedMsgs: ReceivedMessageWithLock[] = []; @@ -865,7 +865,7 @@ describe("Streaming with sessions", () => { for (const message of testMessages) { batchMessageToSend.tryAdd(message); } - await sender.send(batchMessageToSend); + await sender.sendMessages(batchMessageToSend); const settledMsgs: ReceivedMessageWithLock[] = []; const receivedMsgs: ReceivedMessageWithLock[] = []; @@ -1015,7 +1015,7 @@ describe("Streaming with sessions", () => { messages.push(message); batch.tryAdd(message); } - await sender.send(batch); + await sender.sendMessages(batch); const receivedMsgs: ReceivedMessageWithLock[] = []; diff --git a/sdk/servicebus/service-bus/test/utils/testutils2.ts b/sdk/servicebus/service-bus/test/utils/testutils2.ts index 1d27c8f39fc3..a7e45018f794 100644 --- a/sdk/servicebus/service-bus/test/utils/testutils2.ts +++ b/sdk/servicebus/service-bus/test/utils/testutils2.ts @@ -122,7 +122,7 @@ async function createTestEntities( export async function drainAllMessages(receiver: Receiver<{}>): Promise { while (true) { - const messages = await receiver.receiveBatch(10, { maxWaitTimeInMs: 1000 }); + const messages = await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); if (messages.length === 0) { break; @@ -170,7 +170,7 @@ export class ServiceBusTestHelpers { subscription: entityNames.subscription, usesSessions: false }); - receivedMsgs = await receiver.receiveBatch(sentMessages.length, { + receivedMsgs = await receiver.receiveMessages(sentMessages.length, { // maxWaitTime is set same as numberOfMessages being received maxWaitTimeInMs: sentMessages.length * 1000 }); @@ -195,7 +195,7 @@ export class ServiceBusTestHelpers { usesSessions: true, sessionId: id }); - const msgs = await receiver.receiveBatch(numOfMsgsWithSessionId[id], { + const msgs = await receiver.receiveMessages(numOfMsgsWithSessionId[id], { // Since we know the exact number of messages to be received per session-id, // a higher `maxWaitTimeInMs` is not a problem maxWaitTimeInMs: 5000 * numOfMsgsWithSessionId[id] @@ -443,7 +443,7 @@ export function createServiceBusClientForTests( export async function drainReceiveAndDeleteReceiver(receiver: Receiver<{}>): Promise { try { while (true) { - const messages = await receiver.receiveBatch(10, { maxWaitTimeInMs: 1000 }); + const messages = await receiver.receiveMessages(10, { maxWaitTimeInMs: 1000 }); if (messages.length === 0) { break;