Skip to content

Commit

Permalink
[core-amqp][event-hubs][service-bus] move DataTransformer from core-a…
Browse files Browse the repository at this point in the history
…mqp to client packages (Azure#12415)

Part of the list of breaking changes to core-amqp v2 in Azure#12116
Replaces Azure#12320 (precipitated by Azure#12320 (comment))

This change moves the `DataTransformer` interface and `DefaultDataTransformer` class to service-bus and event-hub packages.

When we establish what our data serde strategy is, we can revisit using a shared common serde solution.
  • Loading branch information
chradek authored Nov 10, 2020
1 parent 3dac5ee commit 93444da
Show file tree
Hide file tree
Showing 23 changed files with 328 additions and 97 deletions.
5 changes: 4 additions & 1 deletion sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

## 2.0.0 (Unreleased)

### Breaking Changes
### Breaking changes

- Continuing our work to clean the public API surface that we started in 2.0.0-beta.1, `DataTransformer` and `DefaultDataTransformer` are no longer exported.
`dataTransformer` has been removed from `ConnectionContextBase` and `ConnectionContextBaseParameters`.
This allows us to consider other forms of implementing serializers in the future.
- Previously, `ConnectionConfig.validate()` overridden entityPath if `undefined` with `String(undefined) = "undefined"`. This has been updated to retain `undefined` in the validation.
[PR 12321](https://github.com/Azure/azure-sdk-for-js/pull/12321)

Expand Down
14 changes: 0 additions & 14 deletions sdk/core/core-amqp/review/core-amqp.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ export interface ConnectionContextBase {
connection: Connection;
connectionId: string;
connectionLock: string;
dataTransformer: DataTransformer;
negotiateClaimLock: string;
refreshConnection: () => void;
wasConnectionCloseCalled: boolean;
Expand Down Expand Up @@ -373,23 +372,10 @@ export const Constants: {
export interface CreateConnectionContextBaseParameters {
config: ConnectionConfig;
connectionProperties: ConnectionProperties;
dataTransformer?: DataTransformer;
isEntityPathRequired?: boolean;
operationTimeoutInMs?: number;
}

// @public
export interface DataTransformer {
decode: (body: any) => any;
encode: (body: any) => any;
}

// @public
export class DefaultDataTransformer implements DataTransformer {
decode(body: any): any;
encode(body: any): any;
}

// @public
export const defaultLock: AsyncLock;

Expand Down
13 changes: 0 additions & 13 deletions sdk/core/core-amqp/src/ConnectionContextBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import { Connection, ConnectionOptions, generate_uuid } from "rhea-promise";
import { CbsClient } from "./cbs";
import { DataTransformer, DefaultDataTransformer } from "./dataTransformer";
import { ConnectionConfig } from "./connectionConfig/connectionConfig";

import { Constants } from "./util/constants";
Expand Down Expand Up @@ -44,12 +43,6 @@ export interface ConnectionContextBase {
* called on the connection object.
*/
wasConnectionCloseCalled: boolean;
/**
* @property {DataTransformer} dataTransformer A DataTransformer object that has methods named
* - encode Responsible for encoding the AMQP message before sending it on the wire.
* - decode Responsible for decoding the received AMQP message before passing it to the customer.
*/
dataTransformer: DataTransformer;
/**
* @property {CbsClient} cbsSession A reference to the cbs session ($cbs endpoint) on the
* underlying AMQP connection for the EventHub Client.
Expand Down Expand Up @@ -95,11 +88,6 @@ export interface CreateConnectionContextBaseParameters {
* the AMQP connection.
*/
connectionProperties: ConnectionProperties;
/**
* @property {DataTransformer} [dataTransformer] The datatransformer to be used for encoding and
* decoding messages. Default value: DefaultDataTransformer
*/
dataTransformer?: DataTransformer;
/**
* @property {boolean} [isEntityPathRequired] Determines whether entity path should be a part of
* the connection config. If `true` it must be present, `false` otherwise. Default value false.
Expand Down Expand Up @@ -178,7 +166,6 @@ export const ConnectionContextBase = {
connectionId: connection.id,
cbsSession: new CbsClient(connection, connectionLock),
config: parameters.config,
dataTransformer: parameters.dataTransformer || new DefaultDataTransformer(),
refreshConnection() {
const connection = new Connection(connectionOptions);
const connectionLock = `${Constants.establishConnection}-${generate_uuid()}`;
Expand Down
1 change: 0 additions & 1 deletion sdk/core/core-amqp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

export { RequestResponseLink, SendRequestOptions } from "./requestResponseLink";
export { retry, RetryOptions, RetryConfig, RetryOperationType, RetryMode } from "./retry";
export { DataTransformer, DefaultDataTransformer } from "./dataTransformer";
export { TokenType } from "./auth/token";

export { ConnectionConfig, ConnectionConfigOptions } from "./connectionConfig/connectionConfig";
Expand Down
5 changes: 1 addition & 4 deletions sdk/core/core-amqp/test/context.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import * as chai from "chai";
const should = chai.should();
import { CbsClient, ConnectionConfig, ConnectionContextBase, DefaultDataTransformer } from "../src";
import { CbsClient, ConnectionConfig, ConnectionContextBase } from "../src";
import { Connection } from "rhea-promise";
import { isNode } from "../src/util/utils";

Expand All @@ -26,14 +26,12 @@ describe("ConnectionContextBase", function() {
should.exist(context.connectionId);
should.exist(context.connectionLock);
should.exist(context.negotiateClaimLock);
should.exist(context.dataTransformer);
context.wasConnectionCloseCalled.should.equal(false);
context.connection.should.instanceOf(Connection);
context.connection.options.properties!.product.should.equal("MSJSClient");
context.connection.options.properties!["user-agent"].should.equal("/js-amqp-client");
context.connection.options.properties!.version.should.equal("1.0.0");
context.cbsSession.should.instanceOf(CbsClient);
context.dataTransformer.should.instanceOf(DefaultDataTransformer);
done();
});

Expand Down Expand Up @@ -133,7 +131,6 @@ describe("ConnectionContextBase", function() {
should.exist(context.connectionId);
should.exist(context.connectionLock);
should.exist(context.negotiateClaimLock);
should.exist(context.dataTransformer);
context.wasConnectionCloseCalled.should.equal(false);
context.cbsSession.should.instanceOf(CbsClient);

Expand Down
4 changes: 3 additions & 1 deletion sdk/eventhub/event-hubs/rollup.base.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ export function browserConfig(test = false) {

cjs({
namedExports: {
"@opentelemetry/api": ["CanonicalCode", "SpanKind", "TraceFlags"]
"@opentelemetry/api": ["CanonicalCode", "SpanKind", "TraceFlags"],
chai: ["should", "assert"],
assert: ["equal", "deepEqual", "notEqual"]
}
}),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,16 @@
// Licensed under the MIT license.

import { message } from "rhea-promise";
import { logErrorStackTrace, logger } from "./log";
import isBuffer from "is-buffer";
import { Buffer } from "buffer";

/**
* Describes the transformations that can be performed to encode/decode the data before sending it
* on (or receiving it from) the wire.
*/
export interface DataTransformer {
/**
* @property {Function} encode A function that takes the body property from an EventData object
* and returns an encoded body (some form of AMQP type).
*/
encode: (body: any) => any;
/**
* @property {Function} decode A function that takes the body property from an AMQP message
* and returns the decoded message body. If it cannot decode the body then it returns the body
* as-is.
*/
decode: (body: any) => any;
}
import { logErrorStackTrace, logger } from "./log";

/**
* The default data transformer that will be used by the Azure SDK.
* @internal
* @ingore
*/
export class DefaultDataTransformer implements DataTransformer {
export const defaultDataTransformer = {
/**
* A function that takes the body property from an EventData object
* and returns an encoded body (some form of AMQP type).
Expand Down Expand Up @@ -62,7 +46,7 @@ export class DefaultDataTransformer implements DataTransformer {
}
}
return result;
}
},

/**
* @property {Function} [decode] A function that takes the body property from an AMQP message
Expand Down Expand Up @@ -99,4 +83,4 @@ export class DefaultDataTransformer implements DataTransformer {
}
return processedBody;
}
}
};
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/src/eventDataBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { throwTypeErrorIfParameterMissing } from "./util/error";
import { Span, SpanContext } from "@opentelemetry/api";
import { TRACEPARENT_PROPERTY, instrumentEventData } from "./diagnostics/instrumentEventData";
import { createMessageSpan } from "./diagnostics/messageSpan";
import { defaultDataTransformer } from "./dataTransformer";

/**
* The amount of bytes to reserve as overhead for a small message.
Expand Down Expand Up @@ -303,7 +304,7 @@ export class EventDataBatchImpl implements EventDataBatch {

// Convert EventData to RheaMessage.
const amqpMessage = toRheaMessage(eventData, this._partitionKey);
amqpMessage.body = this._context.dataTransformer.encode(eventData.body);
amqpMessage.body = defaultDataTransformer.encode(eventData.body);
const encodedMessage = message.encode(amqpMessage);

let currentSize = this._sizeInBytes;
Expand Down
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ConnectionContext } from "./connectionContext";
import { LinkEntity } from "./linkEntity";
import { EventPosition, getEventPositionFilter } from "./eventPosition";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { defaultDataTransformer } from "./dataTransformer";

/**
* @ignore
Expand Down Expand Up @@ -230,7 +231,7 @@ export class EventHubReceiver extends LinkEntity {

const data: EventDataInternal = fromRheaMessage(context.message);
const receivedEventData: ReceivedEventData = {
body: this._context.dataTransformer.decode(context.message.body),
body: defaultDataTransformer.decode(context.message.body),
properties: data.properties,
offset: data.offset!,
sequenceNumber: data.sequenceNumber!,
Expand Down
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { SendOptions } from "./models/public";
import { getRetryAttemptTimeoutInMs } from "./util/retries";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { EventDataBatch, isEventDataBatch } from "./eventDataBatch";
import { defaultDataTransformer } from "./dataTransformer";

/**
* Describes the EventHubSender that will send event data to EventHub.
Expand Down Expand Up @@ -322,7 +323,7 @@ export class EventHubSender extends LinkEntity {
// Convert EventData to RheaMessage.
for (let i = 0; i < events.length; i++) {
const message = toRheaMessage(events[i], partitionKey);
message.body = this._context.dataTransformer.encode(events[i].body);
message.body = defaultDataTransformer.encode(events[i].body);
messages[i] = message;
}
// Encode every amqp message and then convert every encoded message to amqp data section
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import * as chai from "chai";
const should = chai.should();
import * as assert from "assert";
import isBuffer from "is-buffer";

import { DefaultDataTransformer } from "../src";
import { defaultDataTransformer } from "../src/dataTransformer";

describe("DataTransformer", function() {
const objectBody: any = {
Expand Down Expand Up @@ -38,9 +37,9 @@ describe("DataTransformer", function() {
const nullBody: null = null;
const undefinedBody: undefined = undefined;
const emptyStringBody: string = "";
const bufferbody: Buffer = Buffer.from("zzz", "utf8");
const bufferBody: Buffer = Buffer.from("zzz", "utf8");
const hexBufferBody: Buffer = Buffer.from("7468697320697320612074c3a97374", "hex");
const transformer = new DefaultDataTransformer();
const transformer = defaultDataTransformer;

it("should correctly encode/decode a string message body", function(done) {
const encoded: any = transformer.encode(stringBody);
Expand Down Expand Up @@ -115,11 +114,11 @@ describe("DataTransformer", function() {
});

it("should correctly encode/decode a buffer message body", function(done) {
const encoded: any = transformer.encode(bufferbody);
const encoded: any = transformer.encode(bufferBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
assert.deepEqual(decoded, bufferbody);
assert.deepEqual(decoded, bufferBody);
done();
});

Expand Down Expand Up @@ -184,8 +183,8 @@ describe("DataTransformer", function() {
});

it("should correctly decode a buffer message body", function(done) {
const decoded: any = transformer.decode(bufferbody);
assert.deepEqual(decoded, bufferbody);
const decoded: any = transformer.decode(bufferBody);
assert.deepEqual(decoded, bufferBody);
done();
});

Expand Down
4 changes: 3 additions & 1 deletion sdk/servicebus/service-bus/rollup.base.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ export function browserConfig(test = false) {
namedExports: {
events: ["EventEmitter"],
long: ["ZERO"],
"@opentelemetry/api": ["CanonicalCode", "SpanKind", "TraceFlags"]
"@opentelemetry/api": ["CanonicalCode", "SpanKind", "TraceFlags"],
chai: ["should", "assert"],
assert: ["equal", "deepEqual", "notEqual"]
}
}),

Expand Down
1 change: 0 additions & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ export class BatchingReceiverLite {

this._createServiceBusMessage = (context: MessageAndDelivery) => {
return new ServiceBusMessageImpl(
_connectionContext.dataTransformer,
context.message!,
context.delivery!,
true,
Expand Down
8 changes: 4 additions & 4 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import { OperationOptionsBase } from "./../modelsToBeSharedWithEventHubs";
import { AbortSignalLike } from "@azure/abort-controller";
import { ReceiveMode } from "../models";
import { translateServiceBusError } from "../serviceBusError";
import { defaultDataTransformer } from "../dataTransformer";

/**
* @internal
Expand Down Expand Up @@ -489,7 +490,7 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
for (const msg of messages) {
const decodedMessage = RheaMessageUtil.decode(msg.message);
const message = fromRheaMessage(decodedMessage as any);
message.body = this._context.dataTransformer.decode(message.body);
message.body = defaultDataTransformer.decode(message.body);
messageList.push(message);
this._lastPeekedSequenceNumber = message.sequenceNumber!;
}
Expand Down Expand Up @@ -589,7 +590,7 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
if (!item.messageId) item.messageId = generate_uuid();
item.scheduledEnqueueTimeUtc = scheduledEnqueueTimeUtc;
const amqpMessage = toRheaMessage(item);
amqpMessage.body = this._context.dataTransformer.encode(amqpMessage.body);
amqpMessage.body = defaultDataTransformer.encode(amqpMessage.body);

try {
const entry: any = {
Expand Down Expand Up @@ -802,7 +803,6 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
for (const msg of messages) {
const decodedMessage = RheaMessageUtil.decode(msg.message);
const message = new ServiceBusMessageImpl(
this._context.dataTransformer,
decodedMessage as any,
{ tag: msg["lock-token"] } as any,
false,
Expand Down Expand Up @@ -1017,7 +1017,7 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
);
const result = await this._makeManagementRequest(request, receiverLogger, options);
return result.body["session-state"]
? this._context.dataTransformer.decode(result.body["session-state"])
? defaultDataTransformer.decode(result.body["session-state"])
: result.body["session-state"];
} catch (err) {
const error = translateServiceBusError(err);
Expand Down
5 changes: 3 additions & 2 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { CreateMessageBatchOptions } from "../models";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { AbortSignalLike } from "@azure/abort-controller";
import { translateServiceBusError } from "../serviceBusError";
import { defaultDataTransformer } from "../dataTransformer";

/**
* @internal
Expand Down Expand Up @@ -355,7 +356,7 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
throwErrorIfConnectionClosed(this._context);
try {
const amqpMessage = toRheaMessage(data);
amqpMessage.body = this._context.dataTransformer.encode(data.body);
amqpMessage.body = defaultDataTransformer.encode(data.body);

// TODO: this body of logic is really similar to what's in sendMessages. Unify what we can.
let encodedMessage;
Expand Down Expand Up @@ -413,7 +414,7 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
// Convert Message to AmqpMessage.
for (let i = 0; i < inputMessages.length; i++) {
const amqpMessage = toRheaMessage(inputMessages[i]);
amqpMessage.body = this._context.dataTransformer.encode(inputMessages[i].body);
amqpMessage.body = defaultDataTransformer.encode(inputMessages[i].body);
amqpMessages[i] = amqpMessage;
try {
encodedMessages[i] = RheaMessageUtil.encode(amqpMessage);
Expand Down
1 change: 0 additions & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ export class StreamingReceiver extends MessageReceiver {
}

const bMessage: ServiceBusMessageImpl = new ServiceBusMessageImpl(
this._context.dataTransformer,
context.message!,
context.delivery!,
true,
Expand Down
Loading

0 comments on commit 93444da

Please sign in to comment.