Skip to content

Commit

Permalink
[@Azure/event-hubs] Adding disableDeserialization option when subsc…
Browse files Browse the repository at this point in the history
…ribing (#18173)
marcodalessandro authored Oct 28, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent f8a3be7 commit 8fcdbfe
Showing 13 changed files with 331 additions and 153 deletions.
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Release History

## 5.6.1 (Unreleased)
## 5.7.0 (Unreleased)

### Features Added
- Added `skipParsingBodyAsJson` optional parameter to `EventHubConsumerClient.subscribe` method. When set to `true` it will disable the client from running `JSON.parse()` on the message body when receiving the message. Not applicable if the message was sent with AMQP body type `value` or `sequence`.

### Breaking Changes

2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/event-hubs",
"sdk-type": "client",
"version": "5.6.1",
"version": "5.7.0",
"description": "Azure Event Hubs SDK for JS.",
"author": "Microsoft Corporation",
"license": "MIT",
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
@@ -277,6 +277,7 @@ export interface SubscribeOptions {
maxBatchSize?: number;
maxWaitTimeInSeconds?: number;
ownerLevel?: number;
skipParsingBodyAsJson?: boolean;
startPosition?: EventPosition | {
[partitionId: string]: EventPosition;
};
13 changes: 10 additions & 3 deletions sdk/eventhub/event-hubs/src/dataTransformer.ts
Original file line number Diff line number Diff line change
@@ -75,22 +75,29 @@ export const defaultDataTransformer = {
* indicating which part of the AMQP message the body was decoded from.
*
* @param body - The AMQP message body as received from rhea.
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body when body type is `content`.
* @returns The decoded/raw body and the body type.
*/
decode(body: unknown | RheaAmqpSection): { body: unknown; bodyType: BodyTypes } {
decode(
body: unknown | RheaAmqpSection,
skipParsingBodyAsJson: boolean
): { body: unknown; bodyType: BodyTypes } {
try {
if (isRheaAmqpSection(body)) {
switch (body.typecode) {
case dataSectionTypeCode:
return { body: tryToJsonDecode(body.content), bodyType: "data" };
return {
body: skipParsingBodyAsJson ? body.content : tryToJsonDecode(body.content),
bodyType: "data"
};
case sequenceSectionTypeCode:
return { body: body.content, bodyType: "sequence" };
case valueSectionTypeCode:
return { body: body.content, bodyType: "value" };
}
} else {
if (isBuffer(body)) {
return { body: tryToJsonDecode(body), bodyType: "data" };
return { body: skipParsingBodyAsJson ? body : tryToJsonDecode(body), bodyType: "data" };
}

return { body, bodyType: "value" };
8 changes: 6 additions & 2 deletions sdk/eventhub/event-hubs/src/eventData.ts
Original file line number Diff line number Diff line change
@@ -156,11 +156,15 @@ const messagePropertiesMap = {
/**
* Converts the AMQP message to an EventData.
* @param msg - The AMQP message that needs to be converted to EventData.
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body when body type is `content`.
* @hidden
*/
export function fromRheaMessage(msg: RheaMessage): EventDataInternal {
export function fromRheaMessage(
msg: RheaMessage,
skipParsingBodyAsJson: boolean
): EventDataInternal {
const rawMessage = AmqpAnnotatedMessage.fromRheaMessage(msg);
const { body, bodyType } = defaultDataTransformer.decode(msg.body);
const { body, bodyType } = defaultDataTransformer.decode(msg.body, skipParsingBodyAsJson);
rawMessage.bodyType = bodyType;

const data: EventDataInternal = {
6 changes: 6 additions & 0 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClientModels.ts
Original file line number Diff line number Diff line change
@@ -198,6 +198,12 @@ export interface SubscribeOptions {
* Options for configuring tracing.
*/
tracingOptions?: OperationTracingOptions;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
6 changes: 4 additions & 2 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
@@ -221,8 +221,10 @@ export class EventHubReceiver extends LinkEntity {
if (!context.message) {
return;
}

const data: EventDataInternal = fromRheaMessage(context.message);
const data: EventDataInternal = fromRheaMessage(
context.message,
!!this.options.skipParsingBodyAsJson
);
const rawMessage = data.getRawAmqpMessage();
const receivedEventData: ReceivedEventData = {
body: data.body,
12 changes: 11 additions & 1 deletion sdk/eventhub/event-hubs/src/models/private.ts
Original file line number Diff line number Diff line change
@@ -76,14 +76,18 @@ export interface CommonEventProcessorOptions
* consumers to fail if their `ownerLevel` is lower or doesn't exist.
* - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events.
* A simple usage can be `{ "maxRetries": 4 }`.
* - `skipParsingBodyAsJson` : Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you prefer to work directly with
* the bytes present in the message body than have the client attempt to parse it.
*
* Example usage:
* ```js
* {
* retryOptions: {
* maxRetries: 4
* },
* trackLastEnqueuedEventProperties: false
* trackLastEnqueuedEventProperties: false,
* skipParsingBodyAsJson: true
* }
* ```
* @internal
@@ -113,4 +117,10 @@ export interface EventHubConsumerOptions {
* against periodically making requests for partition properties using the Event Hub client.
*/
trackLastEnqueuedEventProperties?: boolean;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
@@ -90,7 +90,8 @@ export class PartitionPump {
{
ownerLevel: this._processorOptions.ownerLevel,
trackLastEnqueuedEventProperties: this._processorOptions.trackLastEnqueuedEventProperties,
retryOptions: this._processorOptions.retryOptions
retryOptions: this._processorOptions.retryOptions,
skipParsingBodyAsJson: this._processorOptions.skipParsingBodyAsJson
}
);

2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/util/constants.ts
Original file line number Diff line number Diff line change
@@ -6,5 +6,5 @@
*/
export const packageJsonInfo = {
name: "@azure/event-hubs",
version: "5.6.1"
version: "5.7.0"
};
26 changes: 16 additions & 10 deletions sdk/eventhub/event-hubs/test/internal/amqp.spec.ts
Original file line number Diff line number Diff line change
@@ -14,23 +14,29 @@ testWithServiceTypes(() => {
assert.isFalse(isAmqpAnnotatedMessage({ body: "hello world" }));
assert.isFalse(
isAmqpAnnotatedMessage(
fromRheaMessage({
message_annotations: {
[Constants.enqueuedTime]: Date.now()
fromRheaMessage(
{
message_annotations: {
[Constants.enqueuedTime]: Date.now()
},
body: undefined
},
body: undefined
})
false
)
)
);

assert.isTrue(
isAmqpAnnotatedMessage(
fromRheaMessage({
message_annotations: {
[Constants.enqueuedTime]: Date.now()
fromRheaMessage(
{
message_annotations: {
[Constants.enqueuedTime]: Date.now()
},
body: undefined
},
body: undefined
}).getRawAmqpMessage()
false
).getRawAmqpMessage()
)
);

296 changes: 212 additions & 84 deletions sdk/eventhub/event-hubs/test/internal/dataTransformer.spec.ts

Large diffs are not rendered by default.

106 changes: 59 additions & 47 deletions sdk/eventhub/event-hubs/test/internal/eventdata.spec.ts
Original file line number Diff line number Diff line change
@@ -48,15 +48,18 @@ testWithServiceTypes(() => {
describe("EventData", function(): void {
describe("fromRheaMessage", function(): void {
it("populates body with the message body", function(): void {
const testEventData = fromRheaMessage(testMessage);
const testEventData = fromRheaMessage(testMessage, false);
testEventData.body.should.equal(testBody);
});

it("populates top-level fields", () => {
const testEventData = fromRheaMessage({
...testMessage,
...{ content_type: "application/json", correlation_id: "cid", message_id: 1 }
});
const testEventData = fromRheaMessage(
{
...testMessage,
...{ content_type: "application/json", correlation_id: "cid", message_id: 1 }
},
false
);
should().equal(testEventData.messageId, 1, "Unexpected messageId found.");
should().equal(
testEventData.contentType,
@@ -68,24 +71,24 @@ testWithServiceTypes(() => {

describe("properties", function(): void {
it("enqueuedTimeUtc gets the enqueued time from system properties", function(): void {
const testEventData = fromRheaMessage(testMessage);
const testEventData = fromRheaMessage(testMessage, false);
testEventData
.enqueuedTimeUtc!.getTime()
.should.equal(testAnnotations["x-opt-enqueued-time"]);
});

it("offset gets the offset from system properties", function(): void {
const testEventData = fromRheaMessage(testMessage);
const testEventData = fromRheaMessage(testMessage, false);
testEventData.offset!.should.equal(testAnnotations["x-opt-offset"]);
});

it("sequenceNumber gets the sequence number from system properties", function(): void {
const testEventData = fromRheaMessage(testMessage);
const testEventData = fromRheaMessage(testMessage, false);
testEventData.sequenceNumber!.should.equal(testAnnotations["x-opt-sequence-number"]);
});

it("partitionKey gets the sequence number from system properties", function(): void {
const testEventData = fromRheaMessage(testMessage);
const testEventData = fromRheaMessage(testMessage, false);
testEventData.partitionKey!.should.equal(testAnnotations["x-opt-partition-key"]);
});

@@ -94,14 +97,17 @@ testWithServiceTypes(() => {
"x-iot-foo-prop": "just-a-foo",
"x-iot-bar-prop": "bar-above-the-rest"
};
const testEventData = fromRheaMessage({
body: testBody,
application_properties: applicationProperties,
message_annotations: {
...testAnnotations,
...extraAnnotations
}
});
const testEventData = fromRheaMessage(
{
body: testBody,
application_properties: applicationProperties,
message_annotations: {
...testAnnotations,
...extraAnnotations
}
},
false
);
testEventData
.enqueuedTimeUtc!.getTime()
.should.equal(testAnnotations["x-opt-enqueued-time"]);
@@ -117,24 +123,27 @@ testWithServiceTypes(() => {
});

it("returns systemProperties for special known properties", function(): void {
const testEventData = fromRheaMessage({
body: testBody,
application_properties: applicationProperties,
message_annotations: testAnnotations,
message_id: "messageId",
user_id: "userId",
to: "to",
subject: "subject",
reply_to: "replyTo",
reply_to_group_id: "replyToGroupId",
content_encoding: "utf-8",
content_type: "application/json",
correlation_id: "id2",
absolute_expiry_time: new Date(0),
creation_time: new Date(0),
group_id: "groupId",
group_sequence: 1
});
const testEventData = fromRheaMessage(
{
body: testBody,
application_properties: applicationProperties,
message_annotations: testAnnotations,
message_id: "messageId",
user_id: "userId",
to: "to",
subject: "subject",
reply_to: "replyTo",
reply_to_group_id: "replyToGroupId",
content_encoding: "utf-8",
content_type: "application/json",
correlation_id: "id2",
absolute_expiry_time: new Date(0),
creation_time: new Date(0),
group_id: "groupId",
group_sequence: 1
},
false
);

testEventData
.enqueuedTimeUtc!.getTime()
@@ -164,20 +173,23 @@ testWithServiceTypes(() => {
"x-date": timestamp,
"x-number": timestamp.getTime()
};
const testEventData = fromRheaMessage({
body: testBody,
application_properties: {
topLevelDate: timestamp,
child: {
nestedDate: timestamp,
children: [timestamp, { deepDate: timestamp }]
const testEventData = fromRheaMessage(
{
body: testBody,
application_properties: {
topLevelDate: timestamp,
child: {
nestedDate: timestamp,
children: [timestamp, { deepDate: timestamp }]
}
},
message_annotations: {
...testAnnotations,
...extraAnnotations
}
},
message_annotations: {
...testAnnotations,
...extraAnnotations
}
});
false
);
testEventData
.enqueuedTimeUtc!.getTime()
.should.equal(testAnnotations["x-opt-enqueued-time"]);

0 comments on commit 8fcdbfe

Please sign in to comment.