Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Remove the middle layer of EventHubProducer #9548

Merged
merged 4 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 115 additions & 30 deletions sdk/eventhub/event-hubs/src/eventHubProducerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
// Licensed under the MIT license.

import { isTokenCredential, TokenCredential } from "@azure/core-amqp";
import { getTracer } from "@azure/core-tracing";
import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { ConnectionContext } from "./connectionContext";
import { instrumentEventData, TRACEPARENT_PROPERTY } from "./diagnostics/instrumentEventData";
import { createMessageSpan } from "./diagnostics/messageSpan";
import { EventData } from "./eventData";
import { EventDataBatch, isEventDataBatch } from "./eventDataBatch";
import { EventDataBatch, EventDataBatchImpl, isEventDataBatch } from "./eventDataBatch";
import { EventHubSender } from './eventHubSender';
import { createConnectionContext } from "./impl/eventHubClient";
import { logErrorStackTrace, logger } from './log';
import { EventHubProperties, PartitionProperties } from "./managementClient";
import {
CreateBatchOptions,
Expand All @@ -15,9 +21,8 @@ import {
GetPartitionPropertiesOptions,
SendBatchOptions
} from "./models/public";
import { EventHubProducer } from "./sender";
import { throwErrorIfConnectionClosed } from "./util/error";
import { OperationOptions } from "./util/operationOptions";
import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error";
import { getParentSpan, OperationOptions } from "./util/operationOptions";

/**
* The `EventHubProducerClient` class is used to send events to an Event Hub.
Expand All @@ -42,10 +47,9 @@ export class EventHubProducerClient {
*/
private _clientOptions: EventHubClientOptions;
/**
* Map of partitionId to producers
* Map of partitionId to senders
*/
private _producersMap: Map<string, EventHubProducer>;

private _sendersMap: Map<string, EventHubSender>;
/**
* @property
* @readonly
Expand Down Expand Up @@ -135,7 +139,7 @@ export class EventHubProducerClient {
this._clientOptions = options4 || {};
}

this._producersMap = new Map();
this._sendersMap = new Map();
}

/**
Expand All @@ -152,23 +156,43 @@ export class EventHubProducerClient {
* @throws Error if the underlying connection has been closed, create a new EventHubProducerClient.
* @throws AbortError if the operation is cancelled via the abortSignal in the options.
*/
async createBatch(options?: CreateBatchOptions): Promise<EventDataBatch> {
async createBatch(options: CreateBatchOptions = {}): Promise<EventDataBatch> {
throwErrorIfConnectionClosed(this._context);

if (options && options.partitionId && options.partitionKey) {
if (options.partitionId && options.partitionKey) {
throw new Error("partitionId and partitionKey cannot both be set when creating a batch");
}

let producer = this._producersMap.get("");

if (!producer) {
producer = new EventHubProducer(this._context, {
retryOptions: this._clientOptions.retryOptions
});
this._producersMap.set("", producer);
let sender = this._sendersMap.get("");
if (!sender) {
sender = EventHubSender.create(this._context);
this._sendersMap.set("", sender);
}

return producer.createBatch(options);
let maxMessageSize = await sender.getMaxMessageSize({
retryOptions: this._clientOptions.retryOptions,
abortSignal: options.abortSignal
});

if (options.maxSizeInBytes) {
if (options.maxSizeInBytes > maxMessageSize) {
const error = new Error(
`Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.`
);
logger.warning(
`[${this._context.connectionId}] Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link. ${error}`
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
);
logErrorStackTrace(error);
throw error;
}
maxMessageSize = options.maxSizeInBytes;
}
return new EventDataBatchImpl(
this._context,
maxMessageSize,
options.partitionKey,
options.partitionId
);
}

/**
Expand Down Expand Up @@ -206,9 +230,14 @@ export class EventHubProducerClient {
options: SendBatchOptions | OperationOptions = {}
): Promise<void> {
throwErrorIfConnectionClosed(this._context);
throwTypeErrorIfParameterMissing(this._context.connectionId, "sendBatch", "batch", batch);

let partitionId: string | undefined;
let partitionKey: string | undefined;

// link message span contexts
let spanContextsToLink: SpanContext[] = [];

if (isEventDataBatch(batch)) {
// For batches, partitionId and partitionKey would be set on the batch.
partitionId = batch.partitionId;
Expand All @@ -224,32 +253,65 @@ export class EventHubProducerClient {
`The partitionId (${unexpectedOptions.partitionId}) set on sendBatch does not match the partitionId (${partitionId}) set when creating the batch.`
);
}

spanContextsToLink = batch._messageSpanContexts;
} else {
if (!Array.isArray(batch)) {
batch = [batch];
}

// For arrays of events, partitionId and partitionKey would be set in the options.
const expectedOptions = options as SendBatchOptions;
partitionId = expectedOptions.partitionId;
partitionKey = expectedOptions.partitionKey;

for (let i = 0; i < batch.length; i++) {
const event = batch[i];
if (!event.properties || !event.properties[TRACEPARENT_PROPERTY]) {
const messageSpan = createMessageSpan(getParentSpan(options.tracingOptions));
// since these message spans are created from same context as the send span,
// these message spans don't need to be linked.
// replace the original event with the instrumented one
batch[i] = instrumentEventData(batch[i], messageSpan);
messageSpan.end();
}
}
}
if (partitionId && partitionKey) {
throw new Error(
`The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.`
);
}

if (!partitionId) {
// The producer map requires that partitionId be a string.
partitionId = "";
let sender = this._sendersMap.get(partitionId || "");
chradek marked this conversation as resolved.
Show resolved Hide resolved
if (!sender) {
sender = EventHubSender.create(this._context, partitionId);
this._sendersMap.set(partitionId || "", sender);
}

let producer = this._producersMap.get(partitionId);
if (!producer) {
producer = new EventHubProducer(this._context, {
retryOptions: this._clientOptions.retryOptions,
partitionId: partitionId === "" ? undefined : partitionId
const sendSpan = this._createSendSpan(
getParentSpan(options.tracingOptions),
spanContextsToLink
);

try {
const result = await sender.send(batch, {
...options,
partitionId,
partitionKey,
retryOptions: this._clientOptions.retryOptions
});
sendSpan.setStatus({ code: CanonicalCode.OK });
return result;
} catch (error) {
sendSpan.setStatus({
code: CanonicalCode.UNKNOWN,
message: error.message
});
this._producersMap.set(partitionId, producer);
throw error;
} finally {
sendSpan.end();
}
return producer.send(batch, options);
}

/**
Expand All @@ -261,10 +323,10 @@ export class EventHubProducerClient {
async close(): Promise<void> {
await this._context.close();

for (const pair of this._producersMap) {
for (const pair of this._sendersMap) {
await pair[1].close();
}
this._producersMap.clear();
this._sendersMap.clear();
}

/**
Expand Down Expand Up @@ -317,4 +379,27 @@ export class EventHubProducerClient {
retryOptions: this._clientOptions.retryOptions
});
}

private _createSendSpan(
parentSpan?: Span | SpanContext | null,
spanContextsToLink: SpanContext[] = []
): Span {
const links: Link[] = spanContextsToLink.map((context) => {
return {
context
};
});
const tracer = getTracer();
const span = tracer.startSpan("Azure.EventHubs.send", {
kind: SpanKind.CLIENT,
parent: parentSpan,
links
});

span.setAttribute("az.namespace", "Microsoft.EventHub");
span.setAttribute("message_bus.destination", this._context.config.entityPath);
span.setAttribute("peer.address", this._context.config.host);

return span;
}
}
40 changes: 8 additions & 32 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,38 +297,6 @@ export class EventHubSender extends LinkEntity {
options?: SendOptions & EventHubProducerOptions
): Promise<void> {
try {
// throw an error if partition key and partition id are both defined
if (
options &&
typeof options.partitionKey === "string" &&
typeof options.partitionId === "string"
) {
const error = new Error(
"Partition key is not supported when using producers that were created using a partition id."
);
logger.warning(
"[%s] Partition key is not supported when using producers that were created using a partition id. %O",
this._context.connectionId,
error
);
logErrorStackTrace(error);
throw error;
}

// throw an error if partition key is different than the one provided in the options.
if (isEventDataBatch(events) && options && options.partitionKey) {
const error = new Error(
"Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead."
);
logger.warning(
"[%s] Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead. %O",
this._context.connectionId,
error
);
logErrorStackTrace(error);
throw error;
}

logger.info(
"[%s] Sender '%s', trying to send EventData[].",
this._context.connectionId,
Expand All @@ -337,8 +305,16 @@ export class EventHubSender extends LinkEntity {

let encodedBatchMessage: Buffer | undefined;
if (isEventDataBatch(events)) {
if (events.count === 0) {
logger.info(`[${this._context.connectionId}] Empty batch was passsed. No events to send.`);
return;
}
encodedBatchMessage = events._generateMessage();
} else {
if (events.length === 0) {
logger.info(`[${this._context.connectionId}] Empty array was passed. No events to send.`);
return;
}
const partitionKey = (options && options.partitionKey) || undefined;
const messages: AmqpMessage[] = [];
// Convert EventData to AmqpMessage.
Expand Down
Loading