Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Throw an error if the user tries to just send a string (sender.send(string)) #9381

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import Long from "long";
import * as log from "./log";
import { MessageSender } from "./core/messageSender";
import { ServiceBusMessage } from "./serviceBusMessage";
import { ServiceBusMessage, isServiceBusMessage } from "./serviceBusMessage";
import { ClientEntityContext } from "./clientEntityContext";
import {
getSenderClosedErrorMsg,
Expand Down Expand Up @@ -247,13 +247,12 @@ export class SenderImpl implements Sender {
return this._sender.sendBatch(batch, options);
} else if (isServiceBusMessageBatch(messageOrMessagesOrBatch)) {
return this._sender.sendBatch(messageOrMessagesOrBatch, options);
} else if (isServiceBusMessage(messageOrMessagesOrBatch)) {
return this._sender.send(messageOrMessagesOrBatch, options);
} else {
throwTypeErrorIfParameterMissing(
this._context.namespace.connectionId,
"message, messages or messageBatch",
messageOrMessagesOrBatch
throw new TypeError(
"Invalid type for message. Must be a ServiceBusMessage, an array of ServiceBusMessage or a ServiceBusMessageBatch"
);
return this._sender.send(messageOrMessagesOrBatch, options);
}
}

Expand Down
8 changes: 8 additions & 0 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,14 @@ export function fromAmqpMessage(
return rcvdsbmsg;
}

/**
* @internal
* @ignore
*/
export function isServiceBusMessage(possible: any): possible is ServiceBusMessage {
return possible != null && typeof possible === "object" && "body" in possible;
}

/**
* Describes the message received from Service Bus.
*
Expand Down
47 changes: 46 additions & 1 deletion sdk/servicebus/service-bus/test/internal/sender.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import chai from "chai";
import { ServiceBusMessageBatchImpl } from "../../src/serviceBusMessageBatch";
import { ClientEntityContext } from "../../src/clientEntityContext";
import { ServiceBusMessage } from "../../src";
import { isServiceBusMessageBatch } from "../../src/sender";
import { isServiceBusMessageBatch, SenderImpl } from "../../src/sender";
import { DefaultDataTransformer } from "@azure/core-amqp";
const assert = chai.assert;

describe("sender unit tests", () => {
Expand All @@ -18,4 +19,48 @@ describe("sender unit tests", () => {
assert.isFalse(isServiceBusMessageBatch((4 as any) as ServiceBusMessage));
assert.isFalse(isServiceBusMessageBatch(({} as any) as ServiceBusMessage));
});

["hello", {}, null, undefined].forEach((invalidValue) => {
it(`don't allow Sender.send(${invalidValue})`, async () => {
const sender = new SenderImpl(createClientEntityContextForTests());

try {
await sender.send(
// @ts-expect-error
invalidValue
);
} catch (err) {
assert.equal(err.name, "TypeError");
assert.equal(
err.message,
"Invalid type for message. Must be a ServiceBusMessage, an array of ServiceBusMessage or a ServiceBusMessageBatch"
);
}
});
});
});

function createClientEntityContextForTests(): ClientEntityContext & { initWasCalled: boolean } {
let initWasCalled = false;

const fakeClientEntityContext = {
entityPath: "queue",
sender: {
credit: 999
},
namespace: {
config: { endpoint: "my.service.bus" },
connectionId: "connection-id",
dataTransformer: new DefaultDataTransformer(),
cbsSession: {
cbsLock: "cbs-lock",
async init() {
initWasCalled = true;
}
}
},
initWasCalled
};

return (fakeClientEntityContext as any) as ReturnType<typeof createClientEntityContextForTests>;
}
28 changes: 0 additions & 28 deletions sdk/servicebus/service-bus/test/invalidParameters.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -675,34 +675,6 @@ describe("invalid parameters", () => {
return serviceBusClient.test.afterEach();
});

it("Send: Missing message in Sender", async function(): Promise<void> {
let caughtError: Error | undefined;
try {
await sender.send(undefined as any);
} catch (error) {
caughtError = error;
}
should.equal(caughtError && caughtError.name, "TypeError");
should.equal(
caughtError && caughtError.message,
`Missing parameter "message, messages or messageBatch"`
);
});

it("Sendbatch: Missing messageBatch in Sender", async function(): Promise<void> {
let caughtError: Error | undefined;
try {
await sender.send(undefined as any);
} catch (error) {
caughtError = error;
}
should.equal(caughtError && caughtError.name, "TypeError");
should.equal(
caughtError && caughtError.message,
`Missing parameter "message, messages or messageBatch"`
);
});

it("ScheduledMessage: Missing date in Sender", async function(): Promise<void> {
let caughtError: Error | undefined;
try {
Expand Down
12 changes: 10 additions & 2 deletions sdk/servicebus/service-bus/test/retries.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,11 @@ describe("Retries - MessageSender", () => {
it("Unpartitioned Queue: sendBatch", async function(): Promise<void> {
await beforeEachTest(TestClientType.UnpartitionedQueue);
await mockInitAndVerifyRetries(async () => {
await sender.send(1 as any);
const batch = await sender.createBatch();
batch.tryAdd({
body: "hello"
});
await sender.send(batch);
});
});

Expand All @@ -329,7 +333,11 @@ describe("Retries - MessageSender", () => {
it("Unpartitioned Queue with Sessions: sendBatch", async function(): Promise<void> {
await beforeEachTest(TestClientType.UnpartitionedQueue);
await mockInitAndVerifyRetries(async () => {
await sender.send(1 as any);
const batch = await sender.createBatch();
batch.tryAdd({
body: "hello"
});
await sender.send(batch);
});
});
});
Expand Down
48 changes: 30 additions & 18 deletions sdk/servicebus/service-bus/test/smoketest.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe("Sample scenarios for track 2", () => {
serviceBusClient.createReceiver(queueName, "peekLock")
);

await sendSampleMessage(sender, "Queue, peek/lock");
await sendSampleMessage(sender, "Queue, peek/lock", undefined, "single");

const errors: string[] = [];
const receivedBodies: string[] = [];
Expand All @@ -70,7 +70,7 @@ describe("Sample scenarios for track 2", () => {
serviceBusClient.createReceiver(queueName, "receiveAndDelete")
);

await sendSampleMessage(sender, "Queue, peek/lock, receiveBatch");
await sendSampleMessage(sender, "Queue, peek/lock, receiveBatch", undefined, "array");

const receivedBodies: string[] = [];

Expand All @@ -87,7 +87,7 @@ describe("Sample scenarios for track 2", () => {
serviceBusClient.createReceiver(queueName, "peekLock")
);

await sendSampleMessage(sender, "Queue, peek/lock, iterate messages");
await sendSampleMessage(sender, "Queue, peek/lock, iterate messages", undefined, "batch");

// etc...
// receiver.getRules();
Expand Down Expand Up @@ -160,12 +160,8 @@ describe("Sample scenarios for track 2", () => {
continue;
}

try {
receivedBodies.push(message.body);
break;
} catch (err) {
throw err;
}
receivedBodies.push(message.body);
break;
}

await waitAndValidate(
Expand Down Expand Up @@ -306,12 +302,8 @@ describe("Sample scenarios for track 2", () => {
continue;
}

try {
receivedBodies.push(message.body);
break;
} catch (err) {
throw err;
}
receivedBodies.push(message.body);
break;
}

await waitAndValidate(
Expand Down Expand Up @@ -430,7 +422,12 @@ describe("Sample scenarios for track 2", () => {
});
});

async function sendSampleMessage(sender: Sender, body: string, sessionId?: string) {
async function sendSampleMessage(
sender: Sender,
body: string,
sessionId?: string,
method: "single" | "array" | "batch" = "single"
): Promise<void> {
const message: ServiceBusMessage = {
body
};
Expand All @@ -439,7 +436,22 @@ describe("Sample scenarios for track 2", () => {
message.sessionId = sessionId;
}

await sender.send(message);
switch (method) {
case "single": {
await sender.send(message);
break;
}
case "array": {
await sender.send([message]);
break;
}
case "batch": {
const batch = await sender.createBatch();
assert.isTrue(batch.tryAdd(message));
await sender.send(batch);
break;
}
}
}
});

Expand All @@ -461,7 +473,7 @@ async function waitAndValidate(
receivedBodies: string[],
errors: string[],
receiver: Receiver<ReceivedMessage>
) {
): Promise<void> {
const maxChecks = 20;
let numChecks = 0;

Expand Down