Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Add optional boolean skipParsingBodyAsJson option #18692

Merged
merged 10 commits into from
Dec 10, 2021
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ export interface ServiceBusReceiver {
export interface ServiceBusReceiverOptions {
maxAutoLockRenewalDurationInMs?: number;
receiveMode?: "peekLock" | "receiveAndDelete";
skipParsingBodyAsJson?: boolean;
subQueueType?: "deadLetter" | "transferDeadLetter";
}

Expand Down Expand Up @@ -488,6 +489,7 @@ export interface ServiceBusSessionReceiver extends ServiceBusReceiver {
export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase {
maxAutoLockRenewalDurationInMs?: number;
receiveMode?: "peekLock" | "receiveAndDelete";
skipParsingBodyAsJson?: boolean;
}

// @public
Expand Down
9 changes: 6 additions & 3 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ export class BatchingReceiver extends MessageReceiver {

return this.link;
},
this.receiveMode
this.receiveMode,
options.skipParsingBodyAsJson ?? false
);
}

Expand Down Expand Up @@ -248,7 +249,8 @@ export class BatchingReceiverLite {
private _getCurrentReceiver: (
abortSignal?: AbortSignalLike
) => Promise<MinimalReceiver | undefined>,
private _receiveMode: ReceiveMode
private _receiveMode: ReceiveMode,
_skipParsingBodyAsJson: boolean
) {
this._createAndEndProcessingSpan = createAndEndProcessingSpan;

Expand All @@ -257,7 +259,8 @@ export class BatchingReceiverLite {
context.message!,
context.delivery!,
true,
this._receiveMode
this._receiveMode,
_skipParsingBodyAsJson
);
};

Expand Down
19 changes: 16 additions & 3 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ export interface SendManagementRequestOptions extends SendRequestOptions {
* This is used for service side optimization.
*/
associatedLinkName?: string;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down Expand Up @@ -499,9 +505,15 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
const messages = result.body.messages as { message: Buffer }[];
for (const msg of messages) {
const decodedMessage = RheaMessageUtil.decode(msg.message);
const message = fromRheaMessage(decodedMessage as any);
const message = fromRheaMessage(
decodedMessage as any,
options?.skipParsingBodyAsJson ?? false
jeremymeng marked this conversation as resolved.
Show resolved Hide resolved
);

message.body = defaultDataTransformer.decode(message.body);
message.body = defaultDataTransformer.decode(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is existing code, but can you check if we even need this step? Shouldnt fromRheaMessage() already call defaultDataTransformer.decode() ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, it feels like we should not have this option for management client, and should always either try or skip?

message.body,
options?.skipParsingBodyAsJson ?? false
);
messageList.push(message);
this._lastPeekedSequenceNumber = message.sequenceNumber!;
}
Expand Down Expand Up @@ -813,7 +825,8 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
decodedMessage as any,
{ tag: msg["lock-token"] } as any,
false,
receiveMode
receiveMode,
options?.skipParsingBodyAsJson ?? false
);
messageList.push(message);
}
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export interface ReceiveOptions extends SubscribeOptions {
* maxAutoRenewLockDurationInMs value when they created their receiver.
*/
lockRenewer: LockRenewer | undefined;
skipParsingBodyAsJson: boolean;
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ export class StreamingReceiver extends MessageReceiver {
context.message!,
context.delivery!,
true,
this.receiveMode
this.receiveMode,
options.skipParsingBodyAsJson ?? false
);

this._lockRenewer?.start(this, bMessage, (err) => {
Expand Down
16 changes: 11 additions & 5 deletions sdk/servicebus/service-bus/src/dataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,17 @@ export const defaultDataTransformer = {
* of the AMQP mesage.
*
* @param body - The AMQP message body
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body content.
* @returns decoded body or the given body as-is.
*/
decode(body: unknown): unknown {
decode(body: unknown, skipParsingBodyAsJson: boolean): unknown {
let actualContent = body;

if (isRheaAmqpSection(body)) {
actualContent = body.content;
}

return tryToJsonDecode(actualContent);
return skipParsingBodyAsJson ? actualContent : tryToJsonDecode(actualContent);
},
/**
* A function that takes the body property from an AMQP message, which can come from either
Expand All @@ -103,16 +104,21 @@ export const defaultDataTransformer = {
* indicating which part of the AMQP message the body was decoded from.
*
* @param body - The AMQP message body as received from rhea.
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body.
* @returns The decoded/raw body and the body type.
*/
decodeWithType(
body: unknown | RheaAmqpSection
body: unknown | RheaAmqpSection,
skipParsingBodyAsJson: boolean
): { body: unknown; bodyType: "data" | "sequence" | "value" } {
try {
if (isRheaAmqpSection(body)) {
switch (body.typecode) {
case dataSectionTypeCode:
return { body: tryToJsonDecode(body.content), bodyType: "data" };
return {
body: skipParsingBodyAsJson ? body.content : tryToJsonDecode(body.content),
bodyType: "data"
};
case sequenceSectionTypeCode:
// typecode:
// handle sequences
Expand All @@ -125,7 +131,7 @@ export const defaultDataTransformer = {
// not sure - we have to try to infer the proper bodyType and content
if (isBuffer(body)) {
// This indicates that we are getting the AMQP described type. Let us try decoding it.
return { body: tryToJsonDecode(body), bodyType: "data" };
return { body: skipParsingBodyAsJson ? body : tryToJsonDecode(body), bodyType: "data" };
} else {
return { body: body, bodyType: "value" };
}
Expand Down
12 changes: 12 additions & 0 deletions sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ export interface ServiceBusReceiverOptions {
* - **To disable autolock renewal**, set this to `0`.
*/
maxAutoLockRenewalDurationInMs?: number;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down Expand Up @@ -233,6 +239,12 @@ export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase {
* - **To disable autolock renewal**, set this to `0`.
*/
maxAutoLockRenewalDurationInMs?: number;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down
7 changes: 5 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
public entityPath: string,
public receiveMode: "peekLock" | "receiveAndDelete",
maxAutoRenewLockDurationInMs: number,
private skipParsingBodyAsJson: boolean,
retryOptions: RetryOptions = {}
) {
throwErrorIfConnectionClosed(_context);
Expand Down Expand Up @@ -357,7 +358,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
const receiveOptions: ReceiveOptions = {
maxConcurrentCalls: 0,
receiveMode: this.receiveMode,
lockRenewer: this._lockRenewer
lockRenewer: this._lockRenewer,
skipParsingBodyAsJson: this.skipParsingBodyAsJson
};
this._batchingReceiver = this._createBatchingReceiver(
this._context,
Expand Down Expand Up @@ -507,7 +509,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
...options,
receiveMode: this.receiveMode,
retryOptions: this._retryOptions,
lockRenewer: this._lockRenewer
lockRenewer: this._lockRenewer,
skipParsingBodyAsJson: this.skipParsingBodyAsJson
});

// this ensures that if the outer service bus client is closed that this receiver is cleaned up.
Expand Down
7 changes: 5 additions & 2 deletions sdk/servicebus/service-bus/src/serviceBusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ export class ServiceBusClient {
entityPathWithSubQueue,
receiveMode,
maxLockAutoRenewDurationInMs,
options?.skipParsingBodyAsJson ?? false,
this._clientOptions.retryOptions
);
}
Expand Down Expand Up @@ -321,7 +322,8 @@ export class ServiceBusClient {
maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs,
receiveMode,
abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false
}
);

Expand Down Expand Up @@ -406,7 +408,8 @@ export class ServiceBusClient {
maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs,
receiveMode,
abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false
}
);

Expand Down
28 changes: 9 additions & 19 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage {
*/
export function fromRheaMessage(
rheaMessage: RheaMessage,
skipParsingBodyAsJson: boolean,
delivery?: Delivery,
shouldReorderLockToken?: boolean
): ServiceBusReceivedMessage {
Expand All @@ -516,7 +517,10 @@ export function fromRheaMessage(
};
}

const { body, bodyType } = defaultDataTransformer.decodeWithType(rheaMessage.body);
const { body, bodyType } = defaultDataTransformer.decodeWithType(
rheaMessage.body,
skipParsingBodyAsJson
);

const sbmsg: ServiceBusMessage = {
body: body
Expand Down Expand Up @@ -874,36 +878,22 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage {
msg: RheaMessage,
delivery: Delivery,
shouldReorderLockToken: boolean,
receiveMode: ReceiveMode
receiveMode: ReceiveMode,
skipParsingBodyAsJson: boolean
) {
const { _rawAmqpMessage, ...restOfMessageProps } = fromRheaMessage(
msg,
skipParsingBodyAsJson,
delivery,
shouldReorderLockToken
);
this._rawAmqpMessage = _rawAmqpMessage; // need to initialize _rawAmqpMessage property to make compiler happy
Object.assign(this, restOfMessageProps);
// Lock on a message is applicable only in peekLock mode, but the service sets
// the lock token even in receiveAndDelete mode if the entity in question is partitioned.
if (receiveMode === "receiveAndDelete") {
this.lockToken = undefined;
}

let actualBodyType:
| ReturnType<typeof defaultDataTransformer["decodeWithType"]>["bodyType"]
| undefined = undefined;

if (msg.body) {
try {
const result = defaultDataTransformer.decodeWithType(msg.body);

this.body = result.body;
actualBodyType = result.bodyType;
} catch (err) {
this.body = undefined;
}
}
this._rawAmqpMessage = _rawAmqpMessage;
this._rawAmqpMessage.bodyType = actualBodyType;
this.delivery = delivery;
}

Expand Down
10 changes: 8 additions & 2 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export type MessageSessionOptions = Pick<
> & {
receiveMode?: ReceiveMode;
retryOptions: RetryOptions | undefined;
skipParsingBodyAsJson: boolean;
};

/**
Expand Down Expand Up @@ -180,6 +181,8 @@ export class MessageSession extends LinkEntity<Receiver> {

private _totalAutoLockRenewDuration: number;

private skipParsingBodyAsJson: boolean;

public get receiverHelper(): ReceiverHelper {
return this._receiverHelper;
}
Expand Down Expand Up @@ -375,6 +378,7 @@ export class MessageSession extends LinkEntity<Receiver> {
this.autoComplete = false;
if (isDefined(this._providedSessionId)) this.sessionId = this._providedSessionId;
this.receiveMode = options.receiveMode || "peekLock";
this.skipParsingBodyAsJson = options.skipParsingBodyAsJson;
this.maxAutoRenewDurationInMs =
options.maxAutoLockRenewalDurationInMs != null
? options.maxAutoLockRenewalDurationInMs
Expand All @@ -389,7 +393,8 @@ export class MessageSession extends LinkEntity<Receiver> {
async (_abortSignal?: AbortSignalLike): Promise<MinimalReceiver> => {
return this.link!;
},
this.receiveMode
this.receiveMode,
this.skipParsingBodyAsJson
);

// setting all the handlers
Expand Down Expand Up @@ -628,7 +633,8 @@ export class MessageSession extends LinkEntity<Receiver> {
context.message!,
context.delivery!,
true,
this.receiveMode
this.receiveMode,
this.skipParsingBodyAsJson
);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ describe("Ensure typescript samples use published package", function(): void {
}

it("Ensure TypeScript samples use published package", async () => {
const pattern = "samples/typescript/src/**/*.ts";
const pattern = "samples/v7/typescript/src/**/*.ts";
const files = await globAsync(pattern);
testSamples(files, new RegExp('from\\s"@azure/service-bus"'));
});

it("Ensure JavaScript samples use published package", async () => {
const pattern = "samples/javascript/**/*.js";
const pattern = "samples/v7/javascript/**/*.js";
const files = await globAsync(pattern);
testSamples(files, new RegExp('=\\srequire\\("@azure/service-bus"\\)'));
});
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/test/internal/retries.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ describe("Retries - Receive methods", () => {
"dummyEntityPath",
{
lockRenewer: undefined,
receiveMode: "peekLock"
receiveMode: "peekLock",
skipParsingBodyAsJson: false
}
);
batchingReceiver.isOpen = () => true;
Expand Down
Loading