Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[@azure/event-hubs] Adding disableDeserialization option when subscribing #18173

Merged
merged 14 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Release History

## 5.6.1 (Unreleased)
## 5.7.0 (Unreleased)

### Features Added
- Added `skipParsingBodyAsJson` optional parameter to `EventHubConsumerClient.subscribe` method. When set to `true` it will disable the client from running `JSON.parse()` on the message body when receiving the message. Not applicable if the message was sent with AMQP body type `value` or `sequence`.

### Breaking Changes

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/event-hubs",
"sdk-type": "client",
"version": "5.6.1",
"version": "5.7.0",
"description": "Azure Event Hubs SDK for JS.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ export interface SubscribeOptions {
maxBatchSize?: number;
maxWaitTimeInSeconds?: number;
ownerLevel?: number;
skipParsingBodyAsJson?: boolean;
startPosition?: EventPosition | {
[partitionId: string]: EventPosition;
};
Expand Down
13 changes: 10 additions & 3 deletions sdk/eventhub/event-hubs/src/dataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,29 @@ export const defaultDataTransformer = {
* indicating which part of the AMQP message the body was decoded from.
*
* @param body - The AMQP message body as received from rhea.
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body when body type is `content`.
* @returns The decoded/raw body and the body type.
*/
decode(body: unknown | RheaAmqpSection): { body: unknown; bodyType: BodyTypes } {
decode(
body: unknown | RheaAmqpSection,
skipParsingBodyAsJson: boolean
): { body: unknown; bodyType: BodyTypes } {
try {
if (isRheaAmqpSection(body)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR:

@richardpark-msft, @HarshaNalluru Do you recall what is the scenario where we will be trying to decode something that is not a rhea amqp section? This isRheaAmqpSection method was introduced in #15939, but the idea of "lets try decoding body.content if it exists, otherwise decode body itself" was present before as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logged #18216 for follow up here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idea of "lets try decoding body.content if it exists, otherwise decode body itself" was present before as well

It was added long long ago and we never touched it I believe, and always retained the piece of code.

For both Event Hubs and Service Bus.
image

Do you recall what is the scenario where we will be trying to decode something that is not a rhea amqp section?

image
Body is not a rhea amqp section if the body is undefined or the typecode is bad somehow.
It feels like it just means we don't understand the body/content, and as the last line of defense, we'll do whatever we used to do before instead of throwing an error.

switch (body.typecode) {
case dataSectionTypeCode:
return { body: tryToJsonDecode(body.content), bodyType: "data" };
return {
body: skipParsingBodyAsJson ? body.content : tryToJsonDecode(body.content),
bodyType: "data"
};
case sequenceSectionTypeCode:
return { body: body.content, bodyType: "sequence" };
case valueSectionTypeCode:
return { body: body.content, bodyType: "value" };
}
} else {
if (isBuffer(body)) {
return { body: tryToJsonDecode(body), bodyType: "data" };
return { body: skipParsingBodyAsJson ? body : tryToJsonDecode(body), bodyType: "data" };
}

return { body, bodyType: "value" };
Expand Down
8 changes: 6 additions & 2 deletions sdk/eventhub/event-hubs/src/eventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,15 @@ const messagePropertiesMap = {
/**
* Converts the AMQP message to an EventData.
* @param msg - The AMQP message that needs to be converted to EventData.
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body when body type is `content`.
* @hidden
*/
export function fromRheaMessage(msg: RheaMessage): EventDataInternal {
export function fromRheaMessage(
msg: RheaMessage,
skipParsingBodyAsJson: boolean
): EventDataInternal {
const rawMessage = AmqpAnnotatedMessage.fromRheaMessage(msg);
const { body, bodyType } = defaultDataTransformer.decode(msg.body);
const { body, bodyType } = defaultDataTransformer.decode(msg.body, skipParsingBodyAsJson);
rawMessage.bodyType = bodyType;

const data: EventDataInternal = {
Expand Down
6 changes: 6 additions & 0 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClientModels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ export interface SubscribeOptions {
* Options for configuring tracing.
*/
tracingOptions?: OperationTracingOptions;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down
6 changes: 4 additions & 2 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,10 @@ export class EventHubReceiver extends LinkEntity {
if (!context.message) {
return;
}

const data: EventDataInternal = fromRheaMessage(context.message);
const data: EventDataInternal = fromRheaMessage(
context.message,
!!this.options.skipParsingBodyAsJson
);
const rawMessage = data.getRawAmqpMessage();
const receivedEventData: ReceivedEventData = {
body: data.body,
Expand Down
12 changes: 11 additions & 1 deletion sdk/eventhub/event-hubs/src/models/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,18 @@ export interface CommonEventProcessorOptions
* consumers to fail if their `ownerLevel` is lower or doesn't exist.
* - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events.
* A simple usage can be `{ "maxRetries": 4 }`.
* - `skipParsingBodyAsJson` : Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you prefer to work directly with
* the bytes present in the message body than have the client attempt to parse it.
*
* Example usage:
* ```js
* {
* retryOptions: {
* maxRetries: 4
* },
* trackLastEnqueuedEventProperties: false
* trackLastEnqueuedEventProperties: false,
* skipParsingBodyAsJson: true
* }
* ```
* @internal
Expand Down Expand Up @@ -113,4 +117,10 @@ export interface EventHubConsumerOptions {
* against periodically making requests for partition properties using the Event Hub client.
*/
trackLastEnqueuedEventProperties?: boolean;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ export class PartitionPump {
{
ownerLevel: this._processorOptions.ownerLevel,
trackLastEnqueuedEventProperties: this._processorOptions.trackLastEnqueuedEventProperties,
retryOptions: this._processorOptions.retryOptions
retryOptions: this._processorOptions.retryOptions,
skipParsingBodyAsJson: this._processorOptions.skipParsingBodyAsJson
}
);

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
*/
export const packageJsonInfo = {
name: "@azure/event-hubs",
version: "5.6.1"
version: "5.7.0"
};
26 changes: 16 additions & 10 deletions sdk/eventhub/event-hubs/test/internal/amqp.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,29 @@ testWithServiceTypes(() => {
assert.isFalse(isAmqpAnnotatedMessage({ body: "hello world" }));
assert.isFalse(
isAmqpAnnotatedMessage(
fromRheaMessage({
message_annotations: {
[Constants.enqueuedTime]: Date.now()
fromRheaMessage(
{
message_annotations: {
[Constants.enqueuedTime]: Date.now()
},
body: undefined
},
body: undefined
})
false
)
)
);

assert.isTrue(
isAmqpAnnotatedMessage(
fromRheaMessage({
message_annotations: {
[Constants.enqueuedTime]: Date.now()
fromRheaMessage(
{
message_annotations: {
[Constants.enqueuedTime]: Date.now()
},
body: undefined
},
body: undefined
}).getRawAmqpMessage()
false
).getRawAmqpMessage()
)
);

Expand Down
Loading