Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[event-hubs] add AmqpAnnotatedMessage support #15939

Merged
merged 4 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
# Release History

## 5.5.3 (Unreleased)
## 5.6.0 (Unreleased)

### Features Added

- Adds the `contentType`, `correlationId`, and `messageId` AMQP properties as top-level fields on `EventData` and `ReceivedEventData`.

- Enable encoding the body of a message to the 'value' or 'sequence' sections (via AmqpAnnotatedMessage.bodyType). Using this encoding is not required but does allow you to take advantage of native AMQP serialization for supported primitives or sequences.

More information about the AMQP message body type can be found in the AMQP specification: [link](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format)

### Breaking Changes

### Key Bugs Fixed

### Fixed


## 5.5.2 (2021-06-10)

### Bug fixes
Expand Down
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/event-hubs",
"sdk-type": "client",
"version": "5.5.3",
"version": "5.6.0",
"description": "Azure Event Hubs SDK for JS.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down Expand Up @@ -129,6 +129,7 @@
"assert": "^1.4.1",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"chai-exclude": "^2.0.2",
"chai-string": "^1.5.0",
"cross-env": "^7.0.2",
"debug": "^4.1.1",
Expand Down
12 changes: 10 additions & 2 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
```ts

import { AbortSignalLike } from '@azure/abort-controller';
import { AmqpAnnotatedMessage } from '@azure/core-amqp';
import { MessagingError } from '@azure/core-amqp';
import { NamedKeyCredential } from '@azure/core-auth';
import { OperationTracingOptions } from '@azure/core-tracing';
Expand Down Expand Up @@ -54,6 +55,9 @@ export const earliestEventPosition: EventPosition;
// @public
export interface EventData {
body: any;
contentType?: string;
correlationId?: string | number | Buffer;
messageId?: string | number | Buffer;
properties?: {
[key: string]: any;
};
Expand All @@ -72,7 +76,7 @@ export interface EventDataBatch {
// @internal
readonly partitionKey?: string;
readonly sizeInBytes: number;
tryAdd(eventData: EventData, options?: TryAddOptions): boolean;
tryAdd(eventData: EventData | AmqpAnnotatedMessage, options?: TryAddOptions): boolean;
}

// @public
Expand Down Expand Up @@ -129,7 +133,7 @@ export class EventHubProducerClient {
getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise<EventHubProperties>;
getPartitionIds(options?: GetPartitionIdsOptions): Promise<Array<string>>;
getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise<PartitionProperties>;
sendBatch(batch: EventData[], options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventData[] | AmqpAnnotatedMessage[], options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>;
}

Expand Down Expand Up @@ -239,7 +243,11 @@ export type ProcessInitializeHandler = (context: PartitionContext) => Promise<vo
// @public
export interface ReceivedEventData {
body: any;
contentType?: string;
correlationId?: string | number | Buffer;
enqueuedTimeUtc: Date;
getRawAmqpMessage(): AmqpAnnotatedMessage;
messageId?: string | number | Buffer;
offset: number;
partitionKey: string | null;
properties?: {
Expand Down
141 changes: 105 additions & 36 deletions sdk/eventhub/event-hubs/src/dataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,19 @@ import { message } from "rhea-promise";
import isBuffer from "is-buffer";
import { Buffer } from "buffer";
import { logErrorStackTrace, logger } from "./log";
import { isObjectWithProperties } from "./util/typeGuards";

/**
* The allowed AMQP message body types.
* @internal
*/
export type BodyTypes = "data" | "value" | "sequence";

/** @internal */
export const dataSectionTypeCode = 0x75 as const;
/** @internal */
export const sequenceSectionTypeCode = 0x76 as const;
/** @internal */
export const valueSectionTypeCode = 0x77 as const;

/**
* The default data transformer that will be used by the Azure SDK.
Expand All @@ -17,21 +29,25 @@ export const defaultDataTransformer = {
* and returns an encoded body (some form of AMQP type).
*
* @param body - The AMQP message body
* @returns The encoded AMQP message body as an AMQP Data type
* (data section in rhea terms). Section object with following properties:
* - typecode: 117 (0x75)
* - content: The given AMQP message body as a Buffer.
* - multiple: true | undefined.
* @param bodyType - The AMQP section to story the body in.
* @returns The encoded AMQP message body as an AMQP Data/Sequence/Value section.
*/
encode(body: unknown): any {
encode(body: unknown, bodyType: BodyTypes): any {
let result: any;
if (isBuffer(body)) {
// string, undefined, null, boolean, array, object, number should end up here
// coercing undefined to null as that will ensure that null value will be given to the
// customer on receive.
if (body === undefined) body = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd throw in a blank line here - it's really easy to think the 'if' and the 'underlying else's' all are part of the same chain.

With that said - the 'body being null' thing seems incorrect to me. I know this came from SB, so it's potentially wrong there as well.

For instance, if you pass null you get this from JSON.stringify:

> Buffer.from(JSON.stringify(null), "utf8")
<Buffer 6e 75 6c 6c>

> Buffer.from(JSON.stringify(null), "utf8").toString()
'null'

I haven't looked, but perhaps this all plays nicely with the rhea encoding method for message.<data|value|sequence>_section() methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only time null would have some transformation done on it is for the data_section, which case it will be JSON stringified. I'd agree that this would cause a problem, except we also attempt to JSON parse any data_section bodies. This would result in the body being null, not "null".

That said, we might run into a problem is when talking to the other languages. There's no guarantee other languages are also calling JSON.parse (in fact I doubt they are), so they may see 'null' as a string rather than a null type. It looks like this is already existing behavior, but since null and 'null' both result in null being returned from JSON.parse, updating this is safe.

if (bodyType === "value") {
// TODO: Expose value_section from `rhea` similar to the data_section and sequence_section.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is actually my TODO but we should probably file an issue to push this change into rhea.

// Right now there isn't a way to create a value section officially.
result = message.data_section(body);
result.typecode = valueSectionTypeCode;
} else if (bodyType === "sequence") {
result = message.sequence_section(body);
} else if (isBuffer(body)) {
result = message.data_section(body);
} else {
// string, undefined, null, boolean, array, object, number should end up here
// coercing undefined to null as that will ensure that null value will be given to the
// customer on receive.
if (body === undefined) body = null;
try {
const bodyStr = JSON.stringify(body);
result = message.data_section(Buffer.from(bodyStr, "utf8"));
Expand All @@ -49,38 +65,91 @@ export const defaultDataTransformer = {
},

/**
* A function that takes the body property from an AMQP message
* (an AMQP Data type (data section in rhea terms)) and returns the decoded message body.
* If it cannot decode the body then it returns the body
* as-is.
* @param body - The AMQP message body
* @returns decoded body or the given body as-is.
* A function that takes the body property from an AMQP message, which can come from either
* the 'data', 'value' or 'sequence' sections of an AMQP message.
*
* If the body is not a JSON string the the raw contents will be returned, along with the bodyType
* indicating which part of the AMQP message the body was decoded from.
*
* @param body - The AMQP message body as received from rhea.
* @returns The decoded/raw body and the body type.
*/
decode(body: unknown): any {
let processedBody: any = body;
decode(body: unknown | RheaAmqpSection): { body: unknown; bodyType: BodyTypes } {
try {
if (isObjectWithProperties(body, ["content"]) && isBuffer(body.content)) {
// This indicates that we are getting the AMQP described type. Let us try decoding it.
processedBody = body.content;
}
try {
// Trying to stringify and JSON.parse() anything else will fail flat and we shall return
// the original type back
const bodyStr: string = processedBody.toString("utf8");
processedBody = JSON.parse(bodyStr);
} catch (err) {
logger.verbose(
"[decode] An error occurred while trying JSON.parse() on the received body. " +
"The error is %O",
err
);
if (isRheaAmqpSection(body)) {
switch (body.typecode) {
case dataSectionTypeCode:
return { body: tryToJsonDecode(body.content), bodyType: "data" };
case sequenceSectionTypeCode:
return { body: body.content, bodyType: "sequence" };
case valueSectionTypeCode:
return { body: body.content, bodyType: "value" };
}
} else {
// TODO: test case
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a TODO because it's not done yet or is there an interesting challenge here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, I meant to erase that but the problem with adding TODOs is you have to actually go back and grep them...

if (isBuffer(body)) {
return { body: tryToJsonDecode(body), bodyType: "data" };
}

return { body, bodyType: "value" };
}
} catch (err) {
logger.verbose(
"[decode] An error occurred while decoding the received message body. The error is: %O",
err
);
throw err;
}
return processedBody;
}
};

/**
* Attempts to decode 'body' as a JSON string. If it fails it returns body
* verbatim.
*
* @param body - An AMQP message body.
* @returns A JSON decoded object, or body if body was not a JSON string.
*
* @internal
*/
function tryToJsonDecode(body: unknown): unknown {
let processedBody: any = body;
try {
// Trying to stringify and JSON.parse() anything else will fail flat and we shall return
// the original type back
const bodyStr: string = processedBody.toString("utf8");
processedBody = JSON.parse(bodyStr);
} catch (err) {
logger.verbose(
"[decode] An error occurred while trying JSON.parse() on the received body. The error is %O",
err
);
}
return processedBody;
}

/**
* Mirror of the internal Section interface in rhea.
*
* @internal
*/
export interface RheaAmqpSection {
typecode:
| typeof dataSectionTypeCode
| typeof sequenceSectionTypeCode
| typeof valueSectionTypeCode;
content: any;
}

/** @internal */
export function isRheaAmqpSection(
possibleSection: any | RheaAmqpSection
): possibleSection is RheaAmqpSection {
return (
possibleSection != null &&
typeof possibleSection.typecode === "number" &&
(possibleSection.typecode === dataSectionTypeCode ||
possibleSection.typecode === valueSectionTypeCode ||
possibleSection.typecode === sequenceSectionTypeCode)
);
}
26 changes: 20 additions & 6 deletions sdk/eventhub/event-hubs/src/diagnostics/instrumentEventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing";
import { Span, SpanContext } from "@azure/core-tracing";
import { EventData } from "../eventData";
import { AmqpAnnotatedMessage } from "@azure/core-amqp";
import { EventData, isAmqpAnnotatedMessage } from "../eventData";

/**
* @hidden
Expand All @@ -14,20 +15,33 @@ export const TRACEPARENT_PROPERTY = "Diagnostic-Id";
* Populates the `EventData` with `SpanContext` info to support trace propagation.
* Creates and returns a copy of the passed in `EventData` unless the `EventData`
* has already been instrumented.
* @param eventData - The `EventData` to instrument.
* @param eventData - The `EventData` or `AmqpAnnotatedMessage` to instrument.
* @param span - The `Span` containing the context to propagate tracing information.
*/
export function instrumentEventData(eventData: EventData, span: Span): EventData {
if (eventData.properties && eventData.properties[TRACEPARENT_PROPERTY]) {
export function instrumentEventData(
eventData: EventData | AmqpAnnotatedMessage,
span: Span
): EventData {
const props = isAmqpAnnotatedMessage(eventData)
? eventData.applicationProperties
: eventData.properties;

if (props && props[TRACEPARENT_PROPERTY]) {
return eventData;
}

const copiedProps = { ...props };

// create a copy so the original isn't modified
eventData = { ...eventData, properties: { ...eventData.properties } };
if (isAmqpAnnotatedMessage(eventData)) {
eventData = { ...eventData, applicationProperties: copiedProps };
} else {
eventData = { ...eventData, properties: copiedProps };
}

const traceParent = getTraceParentHeader(span.spanContext());
if (traceParent) {
eventData.properties![TRACEPARENT_PROPERTY] = traceParent;
copiedProps[TRACEPARENT_PROPERTY] = traceParent;
}

return eventData;
Expand Down
Loading