From b7adad9c736d95727cce2266e9345ed030ea0fe5 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Mon, 15 Nov 2021 14:51:17 -0800 Subject: [PATCH 1/7] [ServiceBus] Add optional boolean `skipParsingBodyAsJson` option to `ReceiveMessagesOptions`, `SubscribeOptions`, and `ServiceBusSessionReceiverOptions`. This allows users to control whether the SDK should skip parsing message body as Json object. By default the SDK will attempt parsing message body as Json object. While updating code, I also fixed sample unit test which is using out-dated paths thus not finding any sample code files, and removed two `await` that are redundant. Resolves #18630 --- .../service-bus/review/service-bus.api.md | 3 ++ .../service-bus/src/core/batchingReceiver.ts | 9 ++-- .../service-bus/src/core/managementClient.ts | 19 +++++-- .../service-bus/src/core/streamingReceiver.ts | 3 +- .../service-bus/src/dataTransformer.ts | 16 ++++-- sdk/servicebus/service-bus/src/models.ts | 18 +++++++ .../service-bus/src/receivers/receiver.ts | 3 +- .../service-bus/src/serviceBusClient.ts | 6 ++- .../service-bus/src/serviceBusMessage.ts | 40 +++++++++------ .../service-bus/src/session/messageSession.ts | 7 ++- .../test/internal/node/samples.spec.ts | 4 +- .../test/internal/unit/abortSignal.spec.ts | 3 +- .../test/internal/unit/amqpUnitTests.spec.ts | 3 +- .../internal/unit/batchingReceiver.spec.ts | 19 ++++--- .../internal/unit/dataTransformer.spec.ts | 49 +++++++++++-------- .../internal/unit/linkentity.unittest.spec.ts | 3 +- .../test/internal/unit/messageSession.spec.ts | 18 ++++--- .../unit/receivedMessageProps.spec.ts | 2 +- .../test/internal/unit/receiver.spec.ts | 3 +- .../internal/unit/serviceBusMessage.spec.ts | 14 ++++-- .../test/internal/unit/tracing.spec.ts | 3 +- 21 files changed, 169 insertions(+), 76 deletions(-) diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index 7e70407ab3b..3be716f79ec 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -228,6 +228,7 @@ export interface QueueRuntimeProperties { // @public export interface ReceiveMessagesOptions extends OperationOptionsBase { maxWaitTimeInMs?: number; + skipParsingBodyAsJson?: boolean; } export { RetryMode } @@ -488,6 +489,7 @@ export interface ServiceBusSessionReceiver extends ServiceBusReceiver { export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase { maxAutoLockRenewalDurationInMs?: number; receiveMode?: "peekLock" | "receiveAndDelete"; + skipParsingBodyAsJson?: boolean; } // @public @@ -510,6 +512,7 @@ export interface SqlRuleFilter { export interface SubscribeOptions extends OperationOptionsBase { autoCompleteMessages?: boolean; maxConcurrentCalls?: number; + skipParsingBodyAsJson?: boolean; } // @public diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index e21d125163c..d00c59ab4b7 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -69,7 +69,8 @@ export class BatchingReceiver extends MessageReceiver { return this.link; }, - this.receiveMode + this.receiveMode, + options.skipParsingBodyAsJson ?? false ); } @@ -248,7 +249,8 @@ export class BatchingReceiverLite { private _getCurrentReceiver: ( abortSignal?: AbortSignalLike ) => Promise, - private _receiveMode: ReceiveMode + private _receiveMode: ReceiveMode, + _skipParsingBodyAsJson: boolean ) { this._createAndEndProcessingSpan = createAndEndProcessingSpan; @@ -257,7 +259,8 @@ export class BatchingReceiverLite { context.message!, context.delivery!, true, - this._receiveMode + this._receiveMode, + _skipParsingBodyAsJson ); }; diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index 5f7c07186fa..ea5f0286339 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -63,6 +63,12 @@ export interface SendManagementRequestOptions extends SendRequestOptions { * This is used for service side optimization. */ associatedLinkName?: string; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ + skipParsingBodyAsJson?: boolean; } /** @@ -498,9 +504,15 @@ export class ManagementClient extends LinkEntity { const messages = result.body.messages as { message: Buffer }[]; for (const msg of messages) { const decodedMessage = RheaMessageUtil.decode(msg.message); - const message = fromRheaMessage(decodedMessage as any); + const message = fromRheaMessage( + decodedMessage as any, + options?.skipParsingBodyAsJson ?? false + ); - message.body = defaultDataTransformer.decode(message.body); + message.body = defaultDataTransformer.decode( + message.body, + options?.skipParsingBodyAsJson ?? false + ); messageList.push(message); this._lastPeekedSequenceNumber = message.sequenceNumber!; } @@ -812,7 +824,8 @@ export class ManagementClient extends LinkEntity { decodedMessage as any, { tag: msg["lock-token"] } as any, false, - receiveMode + receiveMode, + options?.skipParsingBodyAsJson ?? false ); messageList.push(message); } diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 6251e79824b..f245ae863e6 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -233,7 +233,8 @@ export class StreamingReceiver extends MessageReceiver { context.message!, context.delivery!, true, - this.receiveMode + this.receiveMode, + options.skipParsingBodyAsJson ?? false ); this._lockRenewer?.start(this, bMessage, (err) => { diff --git a/sdk/servicebus/service-bus/src/dataTransformer.ts b/sdk/servicebus/service-bus/src/dataTransformer.ts index 7cc6ab0a2ed..3a81679d327 100644 --- a/sdk/servicebus/service-bus/src/dataTransformer.ts +++ b/sdk/servicebus/service-bus/src/dataTransformer.ts @@ -64,16 +64,17 @@ export const defaultDataTransformer = { * of the AMQP mesage. * * @param body - The AMQP message body + * @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body content. * @return decoded body or the given body as-is. */ - decode(body: unknown): unknown { + decode(body: unknown, skipParsingBodyAsJson: boolean): unknown { let actualContent = body; if (isRheaAmqpSection(body)) { actualContent = body.content; } - return tryToJsonDecode(actualContent); + return skipParsingBodyAsJson ? actualContent : tryToJsonDecode(actualContent); }, /** * A function that takes the body property from an AMQP message, which can come from either @@ -83,16 +84,21 @@ export const defaultDataTransformer = { * indicating which part of the AMQP message the body was decoded from. * * @param body - The AMQP message body as received from rhea. + * @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body. * @return The decoded/raw body and the body type. */ decodeWithType( - body: unknown | RheaAmqpSection + body: unknown | RheaAmqpSection, + skipParsingBodyAsJson: boolean ): { body: unknown; bodyType: "data" | "sequence" | "value" } { try { if (isRheaAmqpSection(body)) { switch (body.typecode) { case dataSectionTypeCode: - return { body: tryToJsonDecode(body.content), bodyType: "data" }; + return { + body: skipParsingBodyAsJson ? body.content : tryToJsonDecode(body.content), + bodyType: "data" + }; case sequenceSectionTypeCode: // typecode: // handle sequences @@ -105,7 +111,7 @@ export const defaultDataTransformer = { // not sure - we have to try to infer the proper bodyType and content if (isBuffer(body)) { // This indicates that we are getting the AMQP described type. Let us try decoding it. - return { body: tryToJsonDecode(body), bodyType: "data" }; + return { body: skipParsingBodyAsJson ? body : tryToJsonDecode(body), bodyType: "data" }; } else { return { body: body, bodyType: "value" }; } diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index 254ba299c68..70ad66d9c33 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -165,6 +165,12 @@ export interface ReceiveMessagesOptions extends OperationOptionsBase { * **Default**: `60000` milliseconds. */ maxWaitTimeInMs?: number; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ + skipParsingBodyAsJson?: boolean; } /** @@ -199,6 +205,12 @@ export interface SubscribeOptions extends OperationOptionsBase { * - **Default**: `1`. */ maxConcurrentCalls?: number; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ + skipParsingBodyAsJson?: boolean; } /** @@ -233,6 +245,12 @@ export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase { * - **To disable autolock renewal**, set this to `0`. */ maxAutoLockRenewalDurationInMs?: number; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ + skipParsingBodyAsJson?: boolean; } /** diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index e897fb6cc19..45a603831a4 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -357,7 +357,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { const receiveOptions: ReceiveOptions = { maxConcurrentCalls: 0, receiveMode: this.receiveMode, - lockRenewer: this._lockRenewer + lockRenewer: this._lockRenewer, + skipParsingBodyAsJson: options?.skipParsingBodyAsJson }; this._batchingReceiver = this._createBatchingReceiver( this._context, diff --git a/sdk/servicebus/service-bus/src/serviceBusClient.ts b/sdk/servicebus/service-bus/src/serviceBusClient.ts index ac4019a021c..fa2a5d1b443 100644 --- a/sdk/servicebus/service-bus/src/serviceBusClient.ts +++ b/sdk/servicebus/service-bus/src/serviceBusClient.ts @@ -321,7 +321,8 @@ export class ServiceBusClient { maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs, receiveMode, abortSignal: options?.abortSignal, - retryOptions: this._clientOptions.retryOptions + retryOptions: this._clientOptions.retryOptions, + skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false } ); @@ -406,7 +407,8 @@ export class ServiceBusClient { maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs, receiveMode, abortSignal: options?.abortSignal, - retryOptions: this._clientOptions.retryOptions + retryOptions: this._clientOptions.retryOptions, + skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false } ); diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index 06c037f0f58..94d949a42c8 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -504,6 +504,7 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage { */ export function fromRheaMessage( rheaMessage: RheaMessage, + skipParsingBodyAsJson: boolean, delivery?: Delivery, shouldReorderLockToken?: boolean ): ServiceBusReceivedMessage { @@ -513,7 +514,10 @@ export function fromRheaMessage( }; } - const { body, bodyType } = defaultDataTransformer.decodeWithType(rheaMessage.body); + const { body, bodyType } = defaultDataTransformer.decodeWithType( + rheaMessage.body, + skipParsingBodyAsJson + ); const sbmsg: ServiceBusMessage = { body: body @@ -872,10 +876,12 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage { msg: RheaMessage, delivery: Delivery, shouldReorderLockToken: boolean, - receiveMode: ReceiveMode + receiveMode: ReceiveMode, + skipParsingBodyAsJson: boolean ) { const { _rawAmqpMessage, ...restOfMessageProps } = fromRheaMessage( msg, + skipParsingBodyAsJson, delivery, shouldReorderLockToken ); @@ -886,22 +892,26 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage { this.lockToken = undefined; } - let actualBodyType: - | ReturnType["bodyType"] - | undefined = undefined; + // let actualBodyType: + // | ReturnType["bodyType"] + // | undefined = undefined; - if (msg.body) { - try { - const result = defaultDataTransformer.decodeWithType(msg.body); + // if (msg.body) { + // try { + // const result = defaultDataTransformer.decodeWithType(msg.body); + + // this.body = result.body; + // actualBodyType = result.bodyType; + // } catch (err) { + // this.body = undefined; + // } + // } + // why above when `fromRheaMessage()` already called `defaultDataTransformer.decodeWithType()` earlier on message body + this.body = restOfMessageProps.body; - this.body = result.body; - actualBodyType = result.bodyType; - } catch (err) { - this.body = undefined; - } - } this._rawAmqpMessage = _rawAmqpMessage; - this._rawAmqpMessage.bodyType = actualBodyType; + // `_rawAmqpMessage.bodyType` is also assigned already in `fromRheaMessage()` + // this._rawAmqpMessage.bodyType = actualBodyType; this.delivery = delivery; } diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index f38a9fb075f..3d8714980b5 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -62,6 +62,7 @@ export type MessageSessionOptions = Pick< > & { receiveMode?: ReceiveMode; retryOptions: RetryOptions | undefined; + skipParsingBodyAsJson: boolean; }; /** @@ -389,7 +390,8 @@ export class MessageSession extends LinkEntity { async (_abortSignal?: AbortSignalLike): Promise => { return this.link!; }, - this.receiveMode + this.receiveMode, + options.skipParsingBodyAsJson ); // setting all the handlers @@ -628,7 +630,8 @@ export class MessageSession extends LinkEntity { context.message!, context.delivery!, true, - this.receiveMode + this.receiveMode, + options?.skipParsingBodyAsJson ?? false ); try { diff --git a/sdk/servicebus/service-bus/test/internal/node/samples.spec.ts b/sdk/servicebus/service-bus/test/internal/node/samples.spec.ts index 553517cd30e..00371c57fc0 100644 --- a/sdk/servicebus/service-bus/test/internal/node/samples.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/node/samples.spec.ts @@ -34,13 +34,13 @@ describe("Ensure typescript samples use published package", function(): void { } it("Ensure TypeScript samples use published package", async () => { - const pattern = "samples/typescript/src/**/*.ts"; + const pattern = "samples/v7/typescript/src/**/*.ts"; const files = await globAsync(pattern); testSamples(files, new RegExp('from\\s"@azure/service-bus"')); }); it("Ensure JavaScript samples use published package", async () => { - const pattern = "samples/javascript/**/*.js"; + const pattern = "samples/v7/javascript/**/*.js"; const files = await globAsync(pattern); testSamples(files, new RegExp('=\\srequire\\("@azure/service-bus"\\)')); }); diff --git a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts index 571ad967121..84253ff603c 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts @@ -357,7 +357,8 @@ describe("AbortSignal", () => { const connectionContext = createConnectionContextForTestsWithSessionId(); const messageSession = await MessageSession.create(connectionContext, "entityPath", "hello", { - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false }); const session = new ServiceBusSessionReceiverImpl( diff --git a/sdk/servicebus/service-bus/test/internal/unit/amqpUnitTests.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/amqpUnitTests.spec.ts index 4dccc02c4a3..41e3ee64971 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/amqpUnitTests.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/amqpUnitTests.spec.ts @@ -46,7 +46,8 @@ describe("AMQP message encoding", () => { } as any) as Message, {} as Delivery, false, - "receiveAndDelete" + "receiveAndDelete", + false ); it("isAmqpAnnotatedMessage", () => { diff --git a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts index b22b09bf163..b3dcc870b4a 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts @@ -526,7 +526,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); assert.isFalse(batchingReceiver.isReceivingMessages); @@ -541,7 +542,7 @@ describe("BatchingReceiver unit tests", () => { assert.isTrue(batchingReceiver.isReceivingMessages); await receiveIsReady; - await clock.tick(10 + 1); + clock.tick(10 + 1); await prm; assert.isFalse(batchingReceiver.isReceivingMessages); @@ -556,7 +557,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); assert.notExists(batchingReceiver["_closeHandler"]); @@ -572,7 +574,7 @@ describe("BatchingReceiver unit tests", () => { await receiveIsReady; assert.exists(batchingReceiver["_closeHandler"]); - await batchingReceiver.terminate(new Error("actual error")); + batchingReceiver.terminate(new Error("actual error")); try { await receiveMessagesPromise; @@ -591,7 +593,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); assert.notExists(batchingReceiver["_closeHandler"]); @@ -644,7 +647,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); batchingReceiverLite["_receiveMessagesImpl"]( @@ -701,7 +705,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); const receiveIsReady = getReceiveIsReadyPromise(batchingReceiverLite); diff --git a/sdk/servicebus/service-bus/test/internal/unit/dataTransformer.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/dataTransformer.spec.ts index aea607ecd8f..5e13fdb5974 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/dataTransformer.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/dataTransformer.spec.ts @@ -45,16 +45,25 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(stringBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); decoded.should.equal(stringBody); done(); }); + it("should not decode a message body when skipParsingBodyAsJson is specified", function(done) { + const encoded: any = transformer.encode(stringBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const decoded: any = transformer.decode(encoded, true); + decoded.should.equal(encoded.content); + done(); + }); + it("should correctly encode/decode a number message body", function(done) { const encoded: any = transformer.encode(numberBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); decoded.should.equal(numberBody); done(); }); @@ -63,7 +72,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(booleanBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); decoded.should.equal(booleanBody); done(); }); @@ -72,7 +81,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(nullBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); should.equal(decoded, nullBody); done(); }); @@ -81,7 +90,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(undefinedBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); should.equal(decoded, nullBody); done(); }); @@ -90,7 +99,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(emptyStringBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); decoded.should.equal(emptyStringBody); done(); }); @@ -99,7 +108,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(arrayBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); assert.deepEqual(decoded, arrayBody); done(); }); @@ -108,7 +117,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(objectBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); assert.deepEqual(decoded, objectBody); done(); }); @@ -117,7 +126,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(bufferBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); assert.deepEqual(decoded, bufferBody); done(); }); @@ -126,7 +135,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(hexBufferBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); assert.deepEqual(decoded, hexBufferBody); done(); }); @@ -135,61 +144,61 @@ describe("DataTransformer", function() { // It is possible that we receive an AMQP value type from the messages that were sent with // previously shipped version of the sdk. If so then we should be able to handle those scenarios. it("should correctly decode a string message body", function(done) { - const decoded: any = transformer.decode(stringBody); + const decoded: any = transformer.decode(stringBody, false); decoded.should.equal(stringBody); done(); }); it("should correctly decode a number message body", function(done) { - const decoded: any = transformer.decode(numberBody); + const decoded: any = transformer.decode(numberBody, false); decoded.should.equal(numberBody); done(); }); it("should correctly decode a boolean message body", function(done) { - const decoded: any = transformer.decode(booleanBody); + const decoded: any = transformer.decode(booleanBody, false); decoded.should.equal(booleanBody); done(); }); it("should correctly decode a null message body", function(done) { - const decoded: any = transformer.decode(nullBody); + const decoded: any = transformer.decode(nullBody, false); should.equal(decoded, nullBody); done(); }); it("should correctly decode an undefined message body", function(done) { - const decoded: any = transformer.decode(undefinedBody); + const decoded: any = transformer.decode(undefinedBody, false); should.equal(decoded, undefined); done(); }); it("should correctly decode an empty string message body", function(done) { - const decoded: any = transformer.decode(emptyStringBody); + const decoded: any = transformer.decode(emptyStringBody, false); decoded.should.equal(emptyStringBody); done(); }); it("should correctly decode an array message body", function(done) { - const decoded: any = transformer.decode(arrayBody); + const decoded: any = transformer.decode(arrayBody, false); assert.deepEqual(decoded, arrayBody); done(); }); it("should correctly decode an object message body", function(done) { - const decoded: any = transformer.decode(objectBody); + const decoded: any = transformer.decode(objectBody, false); assert.deepEqual(decoded, objectBody); done(); }); it("should correctly decode a buffer message body", function(done) { - const decoded: any = transformer.decode(bufferBody); + const decoded: any = transformer.decode(bufferBody, false); assert.deepEqual(decoded, bufferBody); done(); }); it("should correctly decode a hex buffer message body", function(done) { - const decoded: any = transformer.decode(hexBufferBody); + const decoded: any = transformer.decode(hexBufferBody, false); assert.deepEqual(decoded, hexBufferBody); done(); }); diff --git a/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts index f534cbdda46..6350fe05ddd 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts @@ -410,7 +410,8 @@ describe("LinkEntity unit tests", () => { it("session", () => { const messageSession = new MessageSession(connectionContext, "entityPath", "session-id", { abortSignal: undefined, - retryOptions: {} + retryOptions: {}, + skipParsingBodyAsJson: false }); initCachedLinks(messageSession.name); diff --git a/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts index b915d982377..92b9c1dc19a 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts @@ -63,7 +63,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -93,7 +94,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -123,7 +125,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -169,7 +172,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -221,7 +225,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -369,7 +374,8 @@ describe("Message session unit tests", () => { "session id", { receiveMode: "receiveAndDelete", - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); diff --git a/sdk/servicebus/service-bus/test/internal/unit/receivedMessageProps.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receivedMessageProps.spec.ts index 14286de7fe1..dd13910585c 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receivedMessageProps.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receivedMessageProps.spec.ts @@ -24,7 +24,7 @@ describe("Message translations", () => { ...toRheaMessage(testMessage, { encode: (body) => body }), message_annotations: { [Constants.enqueuedTime]: Date.now().valueOf() } }; - const expiresAtUtc = fromRheaMessage(rheaMsg).expiresAtUtc; + const expiresAtUtc = fromRheaMessage(rheaMsg, false).expiresAtUtc; should.not.equal(expiresAtUtc?.toString(), "Invalid Date", "expiresAtUtc is Invalid Date"); } diff --git a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts index f8b3080d9bd..d33d60ee4b1 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts @@ -237,7 +237,8 @@ describe("Receiver unit tests", () => { "entity path", undefined, { - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); diff --git a/sdk/servicebus/service-bus/test/internal/unit/serviceBusMessage.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/serviceBusMessage.spec.ts index 1db7da374d0..2b3c3223636 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/serviceBusMessage.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/serviceBusMessage.spec.ts @@ -42,7 +42,8 @@ describe("ServiceBusMessageImpl unit tests", () => { amqpMessage, { tag: fakeDeliveryTag } as Delivery, false, - "peekLock" + "peekLock", + false ); assert.equal(sbMessage.lockToken, expectedLockToken, "Unexpected lock token found"); @@ -53,7 +54,8 @@ describe("ServiceBusMessageImpl unit tests", () => { amqpMessage, { tag: fakeDeliveryTag } as Delivery, false, - "receiveAndDelete" + "receiveAndDelete", + false ); assert.equal(!!sbMessage.lockToken, false, "Unexpected lock token found"); @@ -105,7 +107,13 @@ describe("ServiceBusMessageImpl unit tests", () => { user_id: "random_user_id" }; - const sbMessage = new ServiceBusMessageImpl(amqpMessage, fakeDelivery, false, "peekLock"); + const sbMessage = new ServiceBusMessageImpl( + amqpMessage, + fakeDelivery, + false, + "peekLock", + false + ); it("headers match", () => { assert.equal(sbMessage._rawAmqpMessage.header?.firstAcquirer, amqpMessage.first_acquirer); diff --git a/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts index 8d6c560ae8b..fdcb9d84171 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts @@ -61,7 +61,8 @@ describe("Tracing tests", () => { createConnectionContextForTests(), "my entity path", async () => (({} as any) as Receiver), - "peekLock" + "peekLock", + false ); br["_createAndEndProcessingSpan"] = createSpanStub; From 1b91bc49eb516c79539a1dcd883498886d1620ab Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 18 Nov 2021 13:43:11 -0800 Subject: [PATCH 2/7] Add optional `skipParsingBodyAsJson` boolean property to `ServiceBusReceiverOptions` and remove those on `ReceiveMessagesOptions` and `SubscribeOptions` --- .../service-bus/review/service-bus.api.md | 3 +-- .../service-bus/src/core/messageReceiver.ts | 1 + sdk/servicebus/service-bus/src/models.ts | 18 +++++--------- .../service-bus/src/receivers/receiver.ts | 6 +++-- .../service-bus/src/serviceBusClient.ts | 1 + .../service-bus/src/session/messageSession.ts | 7 ++++-- .../service-bus/test/internal/retries.spec.ts | 3 ++- .../test/internal/unit/abortSignal.spec.ts | 6 +++-- .../internal/unit/batchingReceiver.spec.ts | 24 ++++++++++++------- .../internal/unit/linkentity.unittest.spec.ts | 2 ++ .../test/internal/unit/receiver.spec.ts | 22 ++++++++++------- .../unit/serviceBusReceiverUnitTests.spec.ts | 3 ++- .../internal/unit/streamingReceiver.spec.ts | 6 +++-- .../test/internal/unit/tracing.spec.ts | 8 ++++++- .../test/internal/unit/unittestUtils.ts | 3 ++- 15 files changed, 71 insertions(+), 42 deletions(-) diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index 3be716f79ec..143b75c14a0 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -228,7 +228,6 @@ export interface QueueRuntimeProperties { // @public export interface ReceiveMessagesOptions extends OperationOptionsBase { maxWaitTimeInMs?: number; - skipParsingBodyAsJson?: boolean; } export { RetryMode } @@ -459,6 +458,7 @@ export interface ServiceBusReceiver { export interface ServiceBusReceiverOptions { maxAutoLockRenewalDurationInMs?: number; receiveMode?: "peekLock" | "receiveAndDelete"; + skipParsingBodyAsJson?: boolean; subQueueType?: "deadLetter" | "transferDeadLetter"; } @@ -512,7 +512,6 @@ export interface SqlRuleFilter { export interface SubscribeOptions extends OperationOptionsBase { autoCompleteMessages?: boolean; maxConcurrentCalls?: number; - skipParsingBodyAsJson?: boolean; } // @public diff --git a/sdk/servicebus/service-bus/src/core/messageReceiver.ts b/sdk/servicebus/service-bus/src/core/messageReceiver.ts index 3b204dd2aad..89ae4f98044 100644 --- a/sdk/servicebus/service-bus/src/core/messageReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/messageReceiver.ts @@ -51,6 +51,7 @@ export interface ReceiveOptions extends SubscribeOptions { * maxAutoRenewLockDurationInMs value when they created their receiver. */ lockRenewer: LockRenewer | undefined; + skipParsingBodyAsJson: boolean; } /** diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index 70ad66d9c33..0d890d9d8f3 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -136,6 +136,12 @@ export interface ServiceBusReceiverOptions { * - **To disable autolock renewal**, set this to `0`. */ maxAutoLockRenewalDurationInMs?: number; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ + skipParsingBodyAsJson?: boolean; } /** @@ -165,12 +171,6 @@ export interface ReceiveMessagesOptions extends OperationOptionsBase { * **Default**: `60000` milliseconds. */ maxWaitTimeInMs?: number; - /** - * Option to disable the client from running JSON.parse() on the message body when receiving the message. - * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you - * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. - */ - skipParsingBodyAsJson?: boolean; } /** @@ -205,12 +205,6 @@ export interface SubscribeOptions extends OperationOptionsBase { * - **Default**: `1`. */ maxConcurrentCalls?: number; - /** - * Option to disable the client from running JSON.parse() on the message body when receiving the message. - * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you - * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. - */ - skipParsingBodyAsJson?: boolean; } /** diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index 45a603831a4..0faf6af5a07 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -296,6 +296,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { public entityPath: string, public receiveMode: "peekLock" | "receiveAndDelete", maxAutoRenewLockDurationInMs: number, + private skipParsingBodyAsJson: boolean, retryOptions: RetryOptions = {} ) { throwErrorIfConnectionClosed(_context); @@ -358,7 +359,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { maxConcurrentCalls: 0, receiveMode: this.receiveMode, lockRenewer: this._lockRenewer, - skipParsingBodyAsJson: options?.skipParsingBodyAsJson + skipParsingBodyAsJson: this.skipParsingBodyAsJson }; this._batchingReceiver = this._createBatchingReceiver( this._context, @@ -508,7 +509,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { ...options, receiveMode: this.receiveMode, retryOptions: this._retryOptions, - lockRenewer: this._lockRenewer + lockRenewer: this._lockRenewer, + skipParsingBodyAsJson: this.skipParsingBodyAsJson }); // this ensures that if the outer service bus client is closed that this receiver is cleaned up. diff --git a/sdk/servicebus/service-bus/src/serviceBusClient.ts b/sdk/servicebus/service-bus/src/serviceBusClient.ts index fa2a5d1b443..3f987393bbc 100644 --- a/sdk/servicebus/service-bus/src/serviceBusClient.ts +++ b/sdk/servicebus/service-bus/src/serviceBusClient.ts @@ -211,6 +211,7 @@ export class ServiceBusClient { entityPathWithSubQueue, receiveMode, maxLockAutoRenewDurationInMs, + options?.skipParsingBodyAsJson ?? false, this._clientOptions.retryOptions ); } diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index 3d8714980b5..b28b8d91726 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -181,6 +181,8 @@ export class MessageSession extends LinkEntity { private _totalAutoLockRenewDuration: number; + private skipParsingBodyAsJson: boolean; + public get receiverHelper(): ReceiverHelper { return this._receiverHelper; } @@ -376,6 +378,7 @@ export class MessageSession extends LinkEntity { this.autoComplete = false; if (isDefined(this._providedSessionId)) this.sessionId = this._providedSessionId; this.receiveMode = options.receiveMode || "peekLock"; + this.skipParsingBodyAsJson = options.skipParsingBodyAsJson; this.maxAutoRenewDurationInMs = options.maxAutoLockRenewalDurationInMs != null ? options.maxAutoLockRenewalDurationInMs @@ -391,7 +394,7 @@ export class MessageSession extends LinkEntity { return this.link!; }, this.receiveMode, - options.skipParsingBodyAsJson + this.skipParsingBodyAsJson ); // setting all the handlers @@ -631,7 +634,7 @@ export class MessageSession extends LinkEntity { context.delivery!, true, this.receiveMode, - options?.skipParsingBodyAsJson ?? false + this.skipParsingBodyAsJson ); try { diff --git a/sdk/servicebus/service-bus/test/internal/retries.spec.ts b/sdk/servicebus/service-bus/test/internal/retries.spec.ts index aa4aec9c223..067a342b14e 100644 --- a/sdk/servicebus/service-bus/test/internal/retries.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/retries.spec.ts @@ -353,7 +353,8 @@ describe("Retries - Receive methods", () => { "dummyEntityPath", { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false } ); batchingReceiver.isOpen = () => true; diff --git a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts index 84253ff603c..c48c9b3ead9 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts @@ -29,7 +29,8 @@ import { ReceiveMode } from "../../../src/models"; describe("AbortSignal", () => { const defaultOptions = { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false }; const testMessageThatDoesntMatter = { @@ -398,7 +399,8 @@ describe("AbortSignal", () => { createConnectionContextForTests(), "entityPath", "peekLock", - 1 + 1, + false ); try { diff --git a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts index b3dcc870b4a..2a28109ba89 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts @@ -53,7 +53,8 @@ describe("BatchingReceiver unit tests", () => { createConnectionContextForTests(), "fakeEntityPath", "peekLock", - 1 + 1, + false ); let wasCalled = false; @@ -86,7 +87,8 @@ describe("BatchingReceiver unit tests", () => { const receiver = new BatchingReceiver(createConnectionContextForTests(), "fakeEntityPath", { receiveMode: "peekLock", - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false }); try { @@ -105,7 +107,8 @@ describe("BatchingReceiver unit tests", () => { const receiver = new BatchingReceiver(createConnectionContextForTests(), "fakeEntityPath", { receiveMode: "peekLock", - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false }); closeables.push(receiver); @@ -193,7 +196,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(batchingReceiver); @@ -225,7 +229,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(receiver); @@ -257,7 +262,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(batchingReceiver); @@ -305,7 +311,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(batchingReceiver); @@ -359,7 +366,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(batchingReceiver); diff --git a/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts index 6350fe05ddd..add12f6f16a 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts @@ -348,6 +348,7 @@ describe("LinkEntity unit tests", () => { abortSignal: undefined, lockRenewer: undefined, receiveMode: "receiveAndDelete", + skipParsingBodyAsJson: false, tracingOptions: {} }); @@ -371,6 +372,7 @@ describe("LinkEntity unit tests", () => { abortSignal: undefined, lockRenewer: undefined, receiveMode: "receiveAndDelete", + skipParsingBodyAsJson: false, tracingOptions: {} }); diff --git a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts index d33d60ee4b1..30b5d63c52b 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts @@ -31,7 +31,8 @@ describe("Receiver unit tests", () => { "fakeEntityPath", { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false } ); @@ -58,7 +59,8 @@ describe("Receiver unit tests", () => { "fakeEntityPath", { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false } ); @@ -99,7 +101,8 @@ describe("Receiver unit tests", () => { createConnectionContextForTests(), "fakeEntityPath", "peekLock", - 1 + 1, + false ); const subscription = await subscribeAndWaitForInitialize(receiverImpl); @@ -137,7 +140,8 @@ describe("Receiver unit tests", () => { }), "fakeEntityPath", "peekLock", - 1 + 1, + false ); const subscription = await subscribeAndWaitForInitialize(receiverImpl); @@ -168,7 +172,8 @@ describe("Receiver unit tests", () => { createConnectionContextForTests(), "fakeEntityPath", "peekLock", - 1 + 1, + false ); const abortSignal = { @@ -211,7 +216,8 @@ describe("Receiver unit tests", () => { createConnectionContextForTests(), "entity path", "peekLock", - 1 + 1, + false ); const abortSignal = createAbortSignalForTest(true); @@ -277,7 +283,7 @@ describe("Receiver unit tests", () => { it("create() with an existing _streamingReceiver", async () => { const context = createConnectionContextForTests(); - impl = new ServiceBusReceiverImpl(context, "entity path", "peekLock", 1); + impl = new ServiceBusReceiverImpl(context, "entity path", "peekLock", 1, false); const existingStreamingReceiver = createStreamingReceiver("entityPath"); const subscribeStub = sinon.spy(existingStreamingReceiver, "subscribe"); @@ -304,7 +310,7 @@ describe("Receiver unit tests", () => { it("create() with an existing receiver and that receiver is NOT open()", async () => { const context = createConnectionContextForTests(); - impl = new ServiceBusReceiverImpl(context, "entity path", "peekLock", 1); + impl = new ServiceBusReceiverImpl(context, "entity path", "peekLock", 1, false); await subscribeAndWaitForInitialize(impl); diff --git a/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts index 064f225d829..08e41fd9e7b 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts @@ -17,7 +17,8 @@ describe("ServiceBusReceiver unit tests", () => { createConnectionContextForTests(), "entityPath", "peekLock", - 0 + 0, + false ); }); diff --git a/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts index c8a0f8139a2..e395f6e9aac 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts @@ -73,7 +73,8 @@ describe("StreamingReceiver unit tests", () => { it("errors thrown from the user's callback are marked as 'processMessageCallback' errors", async () => { const streamingReceiver = createTestStreamingReceiver("entity path", { lockRenewer: undefined, - receiveMode: "receiveAndDelete" + receiveMode: "receiveAndDelete", + skipParsingBodyAsJson: false }); try { @@ -239,7 +240,8 @@ describe("StreamingReceiver unit tests", () => { it("_setMessageHandlers", async () => { const streamingReceiver = createTestStreamingReceiver("entitypath", { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false }); let processErrorMessages: string[] = []; diff --git a/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts index fdcb9d84171..2a6eeac6774 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts @@ -236,7 +236,13 @@ describe("Tracing tests", () => { * to validate tracing. */ [ - new ServiceBusReceiverImpl(createConnectionContextForTests(), "entity path", "peekLock", 1), + new ServiceBusReceiverImpl( + createConnectionContextForTests(), + "entity path", + "peekLock", + 1, + false + ), new ServiceBusSessionReceiverImpl( {} as MessageSession, createConnectionContextForTests(), diff --git a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts index 67cf7eab4c9..765d9729be0 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts @@ -283,7 +283,8 @@ export function addTestStreamingReceiver() { options = { lockRenewer: undefined, receiveMode: "peekLock", - maxConcurrentCalls: 101 + maxConcurrentCalls: 101, + skipParsingBodyAsJson: false }; } From 9ec54f11f1ab600d542d2dc844a6cdedf1cb22dc Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Mon, 22 Nov 2021 17:04:11 -0800 Subject: [PATCH 3/7] Remove commented code that was trying to decode again but the message body is already decoded, and _rawAmqpMessage is assigned in `fromRheaMessage()` --- .../service-bus/src/serviceBusMessage.ts | 22 +------------------ 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index 94d949a42c8..12bca0f3341 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -885,33 +885,13 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage { delivery, shouldReorderLockToken ); + this._rawAmqpMessage = _rawAmqpMessage; // need to initialize _rawAmqpMessage property to make compiler happy Object.assign(this, restOfMessageProps); // Lock on a message is applicable only in peekLock mode, but the service sets // the lock token even in receiveAndDelete mode if the entity in question is partitioned. if (receiveMode === "receiveAndDelete") { this.lockToken = undefined; } - - // let actualBodyType: - // | ReturnType["bodyType"] - // | undefined = undefined; - - // if (msg.body) { - // try { - // const result = defaultDataTransformer.decodeWithType(msg.body); - - // this.body = result.body; - // actualBodyType = result.bodyType; - // } catch (err) { - // this.body = undefined; - // } - // } - // why above when `fromRheaMessage()` already called `defaultDataTransformer.decodeWithType()` earlier on message body - this.body = restOfMessageProps.body; - - this._rawAmqpMessage = _rawAmqpMessage; - // `_rawAmqpMessage.bodyType` is also assigned already in `fromRheaMessage()` - // this._rawAmqpMessage.bodyType = actualBodyType; this.delivery = delivery; } From 753bae920aaba850c5c7696c7ff319f60344b31f Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Wed, 8 Dec 2021 14:41:40 -0800 Subject: [PATCH 4/7] Remove double decoding message.body has been decoded already above by `fromRheaMessage()` call. --- sdk/servicebus/service-bus/src/core/managementClient.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index ea5f0286339..be319e47365 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -508,11 +508,6 @@ export class ManagementClient extends LinkEntity { decodedMessage as any, options?.skipParsingBodyAsJson ?? false ); - - message.body = defaultDataTransformer.decode( - message.body, - options?.skipParsingBodyAsJson ?? false - ); messageList.push(message); this._lastPeekedSequenceNumber = message.sequenceNumber!; } From 09d8334d90e92f2a789f6c5569f532df9e2f62d4 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Wed, 8 Dec 2021 14:50:11 -0800 Subject: [PATCH 5/7] Update CHANGELOG and package version --- sdk/servicebus/service-bus/CHANGELOG.md | 4 +++- sdk/servicebus/service-bus/package.json | 2 +- sdk/servicebus/service-bus/src/util/constants.ts | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index ca1330fb3bf..d605cd2c42f 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -1,9 +1,11 @@ # Release History -## 7.4.1 (Unreleased) +## 7.5.0-beta.1 (Unreleased) ### Features Added +- Add optional boolean `skipParsingBodyAsJson` property to `ServiceBusReceiverOptions` and `ServiceBusSessionReceiverOptions`. This allows users to control whether the SDK should skip parsing message body as Json object. By default, the SDK will attempt to parse message body as Json object. [PR #18692](https://github.com/Azure/azure-sdk-for-js/pull/18692) + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/servicebus/service-bus/package.json b/sdk/servicebus/service-bus/package.json index 37cc2a8ab19..0fe895304c6 100644 --- a/sdk/servicebus/service-bus/package.json +++ b/sdk/servicebus/service-bus/package.json @@ -2,7 +2,7 @@ "name": "@azure/service-bus", "sdk-type": "client", "author": "Microsoft Corporation", - "version": "7.4.1", + "version": "7.5.0-beta.1", "license": "MIT", "description": "Azure Service Bus SDK for JavaScript", "homepage": "https://github.com/Azure/azure-sdk-for-js/tree/main/sdk/servicebus/service-bus/", diff --git a/sdk/servicebus/service-bus/src/util/constants.ts b/sdk/servicebus/service-bus/src/util/constants.ts index c014283e99f..39f0295a7fd 100644 --- a/sdk/servicebus/service-bus/src/util/constants.ts +++ b/sdk/servicebus/service-bus/src/util/constants.ts @@ -6,7 +6,7 @@ */ export const packageJsonInfo = { name: "@azure/service-bus", - version: "7.4.1" + version: "7.5.0-beta.1" }; /** From 4133ee3a7dbcddaf2b89cf10991ce1b338c15cb3 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Wed, 8 Dec 2021 15:08:00 -0800 Subject: [PATCH 6/7] Add comments --- sdk/servicebus/service-bus/src/core/messageReceiver.ts | 5 +++++ sdk/servicebus/service-bus/src/session/messageSession.ts | 3 +++ 2 files changed, 8 insertions(+) diff --git a/sdk/servicebus/service-bus/src/core/messageReceiver.ts b/sdk/servicebus/service-bus/src/core/messageReceiver.ts index 89ae4f98044..7a02624886a 100644 --- a/sdk/servicebus/service-bus/src/core/messageReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/messageReceiver.ts @@ -51,6 +51,11 @@ export interface ReceiveOptions extends SubscribeOptions { * maxAutoRenewLockDurationInMs value when they created their receiver. */ lockRenewer: LockRenewer | undefined; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ skipParsingBodyAsJson: boolean; } diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index b28b8d91726..2bed1e33e05 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -181,6 +181,9 @@ export class MessageSession extends LinkEntity { private _totalAutoLockRenewDuration: number; + /** + * Whether to prevent the client from running JSON.parse() on the message body when receiving the message. + */ private skipParsingBodyAsJson: boolean; public get receiverHelper(): ReceiverHelper { From bc3d8485ee914fb7a1debe103dba5e7ff3fcb8a2 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 10 Dec 2021 10:52:02 -0800 Subject: [PATCH 7/7] Update sdk/servicebus/service-bus/CHANGELOG.md Co-authored-by: Deyaaeldeen Almahallawi --- sdk/servicebus/service-bus/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 9c456c03bb9..3effd347d2a 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -5,7 +5,7 @@ ### Features Added - Add `state` property to `ServiceBusReceivedMessage`. Its value is one of `"active"`, `"deferred"`, or `"scheduled"`. [PR #18938](https://github.com/Azure/azure-sdk-for-js/pull/18938) -- Add optional boolean `skipParsingBodyAsJson` property to `ServiceBusReceiverOptions` and `ServiceBusSessionReceiverOptions`. This allows users to control whether the SDK should skip parsing message body as Json object. By default, the SDK will attempt to parse message body as Json object. [PR #18692](https://github.com/Azure/azure-sdk-for-js/pull/18692) +- Add optional boolean `skipParsingBodyAsJson` property to `ServiceBusReceiverOptions` and `ServiceBusSessionReceiverOptions`. By default, the client attempts to parse message body as JSON object, and this new parameter controls whether the client should skip performing this parsing. [PR #18692](https://github.com/Azure/azure-sdk-for-js/pull/18692) ### Breaking Changes