diff --git a/sdk/servicebus/service-bus/src/sender.ts b/sdk/servicebus/service-bus/src/sender.ts index 0b366436e638..088d4657e96c 100644 --- a/sdk/servicebus/service-bus/src/sender.ts +++ b/sdk/servicebus/service-bus/src/sender.ts @@ -4,7 +4,7 @@ import Long from "long"; import * as log from "./log"; import { MessageSender } from "./core/messageSender"; -import { ServiceBusMessage } from "./serviceBusMessage"; +import { ServiceBusMessage, isServiceBusMessage } from "./serviceBusMessage"; import { ClientEntityContext } from "./clientEntityContext"; import { getSenderClosedErrorMsg, @@ -247,13 +247,12 @@ export class SenderImpl implements Sender { return this._sender.sendBatch(batch, options); } else if (isServiceBusMessageBatch(messageOrMessagesOrBatch)) { return this._sender.sendBatch(messageOrMessagesOrBatch, options); + } else if (isServiceBusMessage(messageOrMessagesOrBatch)) { + return this._sender.send(messageOrMessagesOrBatch, options); } else { - throwTypeErrorIfParameterMissing( - this._context.namespace.connectionId, - "message, messages or messageBatch", - messageOrMessagesOrBatch + throw new TypeError( + "Invalid type for message. Must be a ServiceBusMessage, an array of ServiceBusMessage or a ServiceBusMessageBatch" ); - return this._sender.send(messageOrMessagesOrBatch, options); } } diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index 04c62abdfbc7..15364611c708 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -668,6 +668,14 @@ export function fromAmqpMessage( return rcvdsbmsg; } +/** + * @internal + * @ignore + */ +export function isServiceBusMessage(possible: any): possible is ServiceBusMessage { + return possible != null && typeof possible === "object" && "body" in possible; +} + /** * Describes the message received from Service Bus. * diff --git a/sdk/servicebus/service-bus/test/internal/sender.spec.ts b/sdk/servicebus/service-bus/test/internal/sender.spec.ts index fc82367a78c9..2d0f3432666a 100644 --- a/sdk/servicebus/service-bus/test/internal/sender.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/sender.spec.ts @@ -5,7 +5,8 @@ import chai from "chai"; import { ServiceBusMessageBatchImpl } from "../../src/serviceBusMessageBatch"; import { ClientEntityContext } from "../../src/clientEntityContext"; import { ServiceBusMessage } from "../../src"; -import { isServiceBusMessageBatch } from "../../src/sender"; +import { isServiceBusMessageBatch, SenderImpl } from "../../src/sender"; +import { DefaultDataTransformer } from "@azure/core-amqp"; const assert = chai.assert; describe("sender unit tests", () => { @@ -18,4 +19,48 @@ describe("sender unit tests", () => { assert.isFalse(isServiceBusMessageBatch((4 as any) as ServiceBusMessage)); assert.isFalse(isServiceBusMessageBatch(({} as any) as ServiceBusMessage)); }); + + ["hello", {}, null, undefined].forEach((invalidValue) => { + it(`don't allow Sender.send(${invalidValue})`, async () => { + const sender = new SenderImpl(createClientEntityContextForTests()); + + try { + await sender.send( + // @ts-expect-error + invalidValue + ); + } catch (err) { + assert.equal(err.name, "TypeError"); + assert.equal( + err.message, + "Invalid type for message. Must be a ServiceBusMessage, an array of ServiceBusMessage or a ServiceBusMessageBatch" + ); + } + }); + }); }); + +function createClientEntityContextForTests(): ClientEntityContext & { initWasCalled: boolean } { + let initWasCalled = false; + + const fakeClientEntityContext = { + entityPath: "queue", + sender: { + credit: 999 + }, + namespace: { + config: { endpoint: "my.service.bus" }, + connectionId: "connection-id", + dataTransformer: new DefaultDataTransformer(), + cbsSession: { + cbsLock: "cbs-lock", + async init() { + initWasCalled = true; + } + } + }, + initWasCalled + }; + + return (fakeClientEntityContext as any) as ReturnType; +} diff --git a/sdk/servicebus/service-bus/test/invalidParameters.spec.ts b/sdk/servicebus/service-bus/test/invalidParameters.spec.ts index da90874203f8..891e0b3e183f 100644 --- a/sdk/servicebus/service-bus/test/invalidParameters.spec.ts +++ b/sdk/servicebus/service-bus/test/invalidParameters.spec.ts @@ -675,34 +675,6 @@ describe("invalid parameters", () => { return serviceBusClient.test.afterEach(); }); - it("Send: Missing message in Sender", async function(): Promise { - let caughtError: Error | undefined; - try { - await sender.send(undefined as any); - } catch (error) { - caughtError = error; - } - should.equal(caughtError && caughtError.name, "TypeError"); - should.equal( - caughtError && caughtError.message, - `Missing parameter "message, messages or messageBatch"` - ); - }); - - it("Sendbatch: Missing messageBatch in Sender", async function(): Promise { - let caughtError: Error | undefined; - try { - await sender.send(undefined as any); - } catch (error) { - caughtError = error; - } - should.equal(caughtError && caughtError.name, "TypeError"); - should.equal( - caughtError && caughtError.message, - `Missing parameter "message, messages or messageBatch"` - ); - }); - it("ScheduledMessage: Missing date in Sender", async function(): Promise { let caughtError: Error | undefined; try { diff --git a/sdk/servicebus/service-bus/test/retries.spec.ts b/sdk/servicebus/service-bus/test/retries.spec.ts index 9569fdf32d16..16485c4316a8 100644 --- a/sdk/servicebus/service-bus/test/retries.spec.ts +++ b/sdk/servicebus/service-bus/test/retries.spec.ts @@ -308,7 +308,11 @@ describe("Retries - MessageSender", () => { it("Unpartitioned Queue: sendBatch", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueue); await mockInitAndVerifyRetries(async () => { - await sender.send(1 as any); + const batch = await sender.createBatch(); + batch.tryAdd({ + body: "hello" + }); + await sender.send(batch); }); }); @@ -329,7 +333,11 @@ describe("Retries - MessageSender", () => { it("Unpartitioned Queue with Sessions: sendBatch", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueue); await mockInitAndVerifyRetries(async () => { - await sender.send(1 as any); + const batch = await sender.createBatch(); + batch.tryAdd({ + body: "hello" + }); + await sender.send(batch); }); }); }); diff --git a/sdk/servicebus/service-bus/test/smoketest.spec.ts b/sdk/servicebus/service-bus/test/smoketest.spec.ts index f4b31473e065..933d65db131f 100644 --- a/sdk/servicebus/service-bus/test/smoketest.spec.ts +++ b/sdk/servicebus/service-bus/test/smoketest.spec.ts @@ -47,7 +47,7 @@ describe("Sample scenarios for track 2", () => { serviceBusClient.createReceiver(queueName, "peekLock") ); - await sendSampleMessage(sender, "Queue, peek/lock"); + await sendSampleMessage(sender, "Queue, peek/lock", undefined, "single"); const errors: string[] = []; const receivedBodies: string[] = []; @@ -70,7 +70,7 @@ describe("Sample scenarios for track 2", () => { serviceBusClient.createReceiver(queueName, "receiveAndDelete") ); - await sendSampleMessage(sender, "Queue, peek/lock, receiveBatch"); + await sendSampleMessage(sender, "Queue, peek/lock, receiveBatch", undefined, "array"); const receivedBodies: string[] = []; @@ -87,7 +87,7 @@ describe("Sample scenarios for track 2", () => { serviceBusClient.createReceiver(queueName, "peekLock") ); - await sendSampleMessage(sender, "Queue, peek/lock, iterate messages"); + await sendSampleMessage(sender, "Queue, peek/lock, iterate messages", undefined, "batch"); // etc... // receiver.getRules(); @@ -160,12 +160,8 @@ describe("Sample scenarios for track 2", () => { continue; } - try { - receivedBodies.push(message.body); - break; - } catch (err) { - throw err; - } + receivedBodies.push(message.body); + break; } await waitAndValidate( @@ -306,12 +302,8 @@ describe("Sample scenarios for track 2", () => { continue; } - try { - receivedBodies.push(message.body); - break; - } catch (err) { - throw err; - } + receivedBodies.push(message.body); + break; } await waitAndValidate( @@ -430,7 +422,12 @@ describe("Sample scenarios for track 2", () => { }); }); - async function sendSampleMessage(sender: Sender, body: string, sessionId?: string) { + async function sendSampleMessage( + sender: Sender, + body: string, + sessionId?: string, + method: "single" | "array" | "batch" = "single" + ): Promise { const message: ServiceBusMessage = { body }; @@ -439,7 +436,22 @@ describe("Sample scenarios for track 2", () => { message.sessionId = sessionId; } - await sender.send(message); + switch (method) { + case "single": { + await sender.send(message); + break; + } + case "array": { + await sender.send([message]); + break; + } + case "batch": { + const batch = await sender.createBatch(); + assert.isTrue(batch.tryAdd(message)); + await sender.send(batch); + break; + } + } } }); @@ -461,7 +473,7 @@ async function waitAndValidate( receivedBodies: string[], errors: string[], receiver: Receiver -) { +): Promise { const maxChecks = 20; let numChecks = 0;