diff --git a/sdk/eventhub/event-hubs/src/eventDataBatch.ts b/sdk/eventhub/event-hubs/src/eventDataBatch.ts index 75b10d6bc562..5299b3370519 100644 --- a/sdk/eventhub/event-hubs/src/eventDataBatch.ts +++ b/sdk/eventhub/event-hubs/src/eventDataBatch.ts @@ -138,10 +138,15 @@ export class EventDataBatchImpl implements EventDataBatch { * @property Describes the amqp connection context for the Client. */ private _context: ConnectionContext; + /** + * @property The Id of the partition to which the batch is expected to be sent to. + * Specifying this will throw an error if the batch was created using a `paritionKey`. + */ + private _partitionId?: string; /** * @property A value that is hashed to produce a partition assignment. * It guarantees that messages with the same partitionKey end up in the same partition. - * Specifying this will throw an error if the producer was created using a `paritionId`. + * Specifying this will throw an error if the batch was created using a `paritionId`. */ private _partitionKey?: string; /** @@ -183,11 +188,12 @@ export class EventDataBatchImpl implements EventDataBatch { context: ConnectionContext, maxSizeInBytes: number, partitionKey?: string, - private _partitionId?: string + partitionId?: string ) { this._context = context; this._maxSizeInBytes = maxSizeInBytes; - this._partitionKey = partitionKey; + this._partitionKey = partitionKey != undefined ? String(partitionKey) : partitionKey; + this._partitionId = partitionId != undefined ? String(partitionId) : partitionId; this._sizeInBytes = 0; this._count = 0; } diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index ac353914b1fc..25a3f5facdcb 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -2,10 +2,16 @@ // Licensed under the MIT license. import { isTokenCredential, TokenCredential } from "@azure/core-amqp"; +import { getTracer } from "@azure/core-tracing"; +import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api"; import { ConnectionContext } from "./connectionContext"; +import { instrumentEventData, TRACEPARENT_PROPERTY } from "./diagnostics/instrumentEventData"; +import { createMessageSpan } from "./diagnostics/messageSpan"; import { EventData } from "./eventData"; -import { EventDataBatch, isEventDataBatch } from "./eventDataBatch"; +import { EventDataBatch, EventDataBatchImpl, isEventDataBatch } from "./eventDataBatch"; +import { EventHubSender } from './eventHubSender'; import { createConnectionContext } from "./impl/eventHubClient"; +import { logErrorStackTrace, logger } from './log'; import { EventHubProperties, PartitionProperties } from "./managementClient"; import { CreateBatchOptions, @@ -15,9 +21,8 @@ import { GetPartitionPropertiesOptions, SendBatchOptions } from "./models/public"; -import { EventHubProducer } from "./sender"; -import { throwErrorIfConnectionClosed } from "./util/error"; -import { OperationOptions } from "./util/operationOptions"; +import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error"; +import { getParentSpan, OperationOptions } from "./util/operationOptions"; /** * The `EventHubProducerClient` class is used to send events to an Event Hub. @@ -42,10 +47,9 @@ export class EventHubProducerClient { */ private _clientOptions: EventHubClientOptions; /** - * Map of partitionId to producers + * Map of partitionId to senders */ - private _producersMap: Map; - + private _sendersMap: Map; /** * @property * @readonly @@ -135,7 +139,7 @@ export class EventHubProducerClient { this._clientOptions = options4 || {}; } - this._producersMap = new Map(); + this._sendersMap = new Map(); } /** @@ -152,23 +156,43 @@ export class EventHubProducerClient { * @throws Error if the underlying connection has been closed, create a new EventHubProducerClient. * @throws AbortError if the operation is cancelled via the abortSignal in the options. */ - async createBatch(options?: CreateBatchOptions): Promise { + async createBatch(options: CreateBatchOptions = {}): Promise { throwErrorIfConnectionClosed(this._context); - if (options && options.partitionId && options.partitionKey) { + if (options.partitionId != undefined && options.partitionKey != undefined) { throw new Error("partitionId and partitionKey cannot both be set when creating a batch"); } - let producer = this._producersMap.get(""); - - if (!producer) { - producer = new EventHubProducer(this._context, { - retryOptions: this._clientOptions.retryOptions - }); - this._producersMap.set("", producer); + let sender = this._sendersMap.get(""); + if (!sender) { + sender = EventHubSender.create(this._context); + this._sendersMap.set("", sender); } - return producer.createBatch(options); + let maxMessageSize = await sender.getMaxMessageSize({ + retryOptions: this._clientOptions.retryOptions, + abortSignal: options.abortSignal + }); + + if (options.maxSizeInBytes) { + if (options.maxSizeInBytes > maxMessageSize) { + const error = new Error( + `Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.` + ); + logger.warning( + `[${this._context.connectionId}] ${error.message}` + ); + logErrorStackTrace(error); + throw error; + } + maxMessageSize = options.maxSizeInBytes; + } + return new EventDataBatchImpl( + this._context, + maxMessageSize, + options.partitionKey, + options.partitionId + ); } /** @@ -206,9 +230,14 @@ export class EventHubProducerClient { options: SendBatchOptions | OperationOptions = {} ): Promise { throwErrorIfConnectionClosed(this._context); + throwTypeErrorIfParameterMissing(this._context.connectionId, "sendBatch", "batch", batch); let partitionId: string | undefined; let partitionKey: string | undefined; + + // link message span contexts + let spanContextsToLink: SpanContext[] = []; + if (isEventDataBatch(batch)) { // For batches, partitionId and partitionKey would be set on the batch. partitionId = batch.partitionId; @@ -224,32 +253,72 @@ export class EventHubProducerClient { `The partitionId (${unexpectedOptions.partitionId}) set on sendBatch does not match the partitionId (${partitionId}) set when creating the batch.` ); } + + spanContextsToLink = batch._messageSpanContexts; } else { + if (!Array.isArray(batch)) { + batch = [batch]; + } + // For arrays of events, partitionId and partitionKey would be set in the options. const expectedOptions = options as SendBatchOptions; partitionId = expectedOptions.partitionId; partitionKey = expectedOptions.partitionKey; + + for (let i = 0; i < batch.length; i++) { + const event = batch[i]; + if (!event.properties || !event.properties[TRACEPARENT_PROPERTY]) { + const messageSpan = createMessageSpan(getParentSpan(options.tracingOptions)); + // since these message spans are created from same context as the send span, + // these message spans don't need to be linked. + // replace the original event with the instrumented one + batch[i] = instrumentEventData(batch[i], messageSpan); + messageSpan.end(); + } + } } - if (partitionId && partitionKey) { + if (partitionId != undefined && partitionKey != undefined) { throw new Error( `The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.` ); } - if (!partitionId) { - // The producer map requires that partitionId be a string. - partitionId = ""; + if (partitionId != undefined) { + partitionId = String(partitionId) + } + if (partitionKey != undefined) { + partitionKey = String(partitionKey) + } + + let sender = this._sendersMap.get(partitionId || ""); + if (!sender) { + sender = EventHubSender.create(this._context, partitionId); + this._sendersMap.set(partitionId || "", sender); } - let producer = this._producersMap.get(partitionId); - if (!producer) { - producer = new EventHubProducer(this._context, { - retryOptions: this._clientOptions.retryOptions, - partitionId: partitionId === "" ? undefined : partitionId + const sendSpan = this._createSendSpan( + getParentSpan(options.tracingOptions), + spanContextsToLink + ); + + try { + const result = await sender.send(batch, { + ...options, + partitionId, + partitionKey, + retryOptions: this._clientOptions.retryOptions }); - this._producersMap.set(partitionId, producer); + sendSpan.setStatus({ code: CanonicalCode.OK }); + return result; + } catch (error) { + sendSpan.setStatus({ + code: CanonicalCode.UNKNOWN, + message: error.message + }); + throw error; + } finally { + sendSpan.end(); } - return producer.send(batch, options); } /** @@ -261,10 +330,10 @@ export class EventHubProducerClient { async close(): Promise { await this._context.close(); - for (const pair of this._producersMap) { + for (const pair of this._sendersMap) { await pair[1].close(); } - this._producersMap.clear(); + this._sendersMap.clear(); } /** @@ -317,4 +386,27 @@ export class EventHubProducerClient { retryOptions: this._clientOptions.retryOptions }); } + + private _createSendSpan( + parentSpan?: Span | SpanContext | null, + spanContextsToLink: SpanContext[] = [] + ): Span { + const links: Link[] = spanContextsToLink.map((context) => { + return { + context + }; + }); + const tracer = getTracer(); + const span = tracer.startSpan("Azure.EventHubs.send", { + kind: SpanKind.CLIENT, + parent: parentSpan, + links + }); + + span.setAttribute("az.namespace", "Microsoft.EventHub"); + span.setAttribute("message_bus.destination", this._context.config.entityPath); + span.setAttribute("peer.address", this._context.config.host); + + return span; + } } diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index d7a7337627ee..5ef7ac5ddbef 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -297,38 +297,6 @@ export class EventHubSender extends LinkEntity { options?: SendOptions & EventHubProducerOptions ): Promise { try { - // throw an error if partition key and partition id are both defined - if ( - options && - typeof options.partitionKey === "string" && - typeof options.partitionId === "string" - ) { - const error = new Error( - "Partition key is not supported when using producers that were created using a partition id." - ); - logger.warning( - "[%s] Partition key is not supported when using producers that were created using a partition id. %O", - this._context.connectionId, - error - ); - logErrorStackTrace(error); - throw error; - } - - // throw an error if partition key is different than the one provided in the options. - if (isEventDataBatch(events) && options && options.partitionKey) { - const error = new Error( - "Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead." - ); - logger.warning( - "[%s] Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead. %O", - this._context.connectionId, - error - ); - logErrorStackTrace(error); - throw error; - } - logger.info( "[%s] Sender '%s', trying to send EventData[].", this._context.connectionId, @@ -337,8 +305,16 @@ export class EventHubSender extends LinkEntity { let encodedBatchMessage: Buffer | undefined; if (isEventDataBatch(events)) { + if (events.count === 0) { + logger.info(`[${this._context.connectionId}] Empty batch was passsed. No events to send.`); + return; + } encodedBatchMessage = events._generateMessage(); } else { + if (events.length === 0) { + logger.info(`[${this._context.connectionId}] Empty array was passed. No events to send.`); + return; + } const partitionKey = (options && options.partitionKey) || undefined; const messages: AmqpMessage[] = []; // Convert EventData to AmqpMessage. diff --git a/sdk/eventhub/event-hubs/src/sender.ts b/sdk/eventhub/event-hubs/src/sender.ts deleted file mode 100644 index 270d20e1b79f..000000000000 --- a/sdk/eventhub/event-hubs/src/sender.ts +++ /dev/null @@ -1,283 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -import { EventData } from "./eventData"; -import { EventHubSender } from "./eventHubSender"; -import { EventHubProducerOptions } from "../src/models/private"; -import { CreateBatchOptions, SendOptions } from "../src/models/public"; -import { ConnectionContext } from "./connectionContext"; -import { logErrorStackTrace, logger } from "./log"; -import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error"; -import { EventDataBatch, EventDataBatchImpl, isEventDataBatch } from "./eventDataBatch"; -import { getTracer } from "@azure/core-tracing"; -import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api"; -import { TRACEPARENT_PROPERTY, instrumentEventData } from "./diagnostics/instrumentEventData"; -import { createMessageSpan } from "./diagnostics/messageSpan"; -import { getParentSpan } from "./util/operationOptions"; - -/** - * A producer responsible for sending events to an Event Hub. - * To create a producer use the `createProducer()` method on your `EventHubClient`. - * You can pass the below in the `options` when creating a producer. - * - `partitionId` : The identifier of the partition that the producer can be bound to. - * - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events. - * A simple usage can be `{ "maxRetries": 4 }`. - * - * If `partitionId` is specified when creating a producer, all event data sent using the producer - * will be sent to the specified partition. - * Otherwise, they are automatically routed to an available partition by the Event Hubs service. - * - * Automatic routing of partitions is recommended because: - * - The sending of events will be highly available. - * - The event data will be evenly distributed among all available partitions. - * - * @class - * @internal - * @ignore - */ -export class EventHubProducer { - /** - * @property Describes the amqp connection context for the Client. - */ - private _context: ConnectionContext; - /** - * @property Denotes if close() was called on this sender - */ - private _isClosed: boolean = false; - - private _senderOptions: EventHubProducerOptions; - - private _eventHubSender: EventHubSender | undefined; - - private _eventHubName: string; - private _fullyQualifiedNamespace: string; - - /** - * @property Returns `true` if either the producer or the client that created it has been closed. - * @readonly - */ - public get isClosed(): boolean { - return this._isClosed || this._context.wasConnectionCloseCalled; - } - - /** - * EventHubProducer should not be constructed using `new EventHubProduer()` - * Use the `createProducer()` method on your `EventHubClient` instead. - * @constructor - * @internal - * @ignore - */ - constructor( - context: ConnectionContext, - options?: EventHubProducerOptions - ) { - this._context = context; - this._senderOptions = options || {}; - const partitionId = - this._senderOptions.partitionId != undefined - ? String(this._senderOptions.partitionId) - : undefined; - this._eventHubSender = EventHubSender.create(this._context, partitionId); - this._eventHubName = this._context.config.entityPath; - this._fullyQualifiedNamespace = this._context.config.host; - } - - /** - * Creates an instance of `EventDataBatch` to which one can add events until the maximum supported size is reached. - * The batch can be passed to the `send()` method of the `EventHubProducer` to be sent to Azure Event Hubs. - * @param options A set of options to configure the behavior of the batch. - * - `partitionKey` : A value that is hashed to produce a partition assignment. - * Not applicable if the `EventHubProducer` was created using a `partitionId`. - * - `maxSizeInBytes`: The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached. - * - `abortSignal` : A signal the request to cancel the send operation. - * @returns Promise - */ - async createBatch(options?: CreateBatchOptions): Promise { - this._throwIfSenderOrConnectionClosed(); - if (!options) { - options = {}; - } - // throw an error if partition key and partition id are both defined - if ( - typeof options.partitionKey === "string" && - typeof this._senderOptions.partitionId === "string" - ) { - const error = new Error( - "Creating a batch with partition key is not supported when using producers that were created using a partition id." - ); - logger.warning( - "[%s] Creating a batch with partition key is not supported when using producers that were created using a partition id. %O", - this._context.connectionId, - error - ); - logErrorStackTrace(error); - throw error; - } - - let maxMessageSize = await this._eventHubSender!.getMaxMessageSize({ - retryOptions: this._senderOptions.retryOptions, - abortSignal: options.abortSignal - }); - if (options.maxSizeInBytes) { - if (options.maxSizeInBytes > maxMessageSize) { - const error = new Error( - `Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.` - ); - logger.warning( - `[${this._context.connectionId}] Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link. ${error}` - ); - logErrorStackTrace(error); - throw error; - } - maxMessageSize = options.maxSizeInBytes; - } - return new EventDataBatchImpl( - this._context, - maxMessageSize, - options.partitionKey, - options.partitionId - ); - } - - /** - * Send events to the associated Event Hub. - * - * @param eventData An array of `EventData` objects or an - * instance of `EventDataBatch`. - * @param options The set of options that can be specified to influence the way in which - * events are sent to the associated Event Hub. - * - `partitionKey` : A value that is hashed to produce a partition assignment. - * Not applicable if the `EventHubProducer` was created using a `partitionId`. - * - `abortSignal` : A signal the request to cancel the send operation. - * - * @returns Promise - * @throws AbortError if the operation is cancelled via the abortSignal. - * @throws MessagingError if an error is encountered while sending a message. - * @throws TypeError if a required parameter is missing. - * @throws Error if the underlying connection or sender has been closed. - * @throws Error if a partitionKey is provided when the producer was created with a partitionId. - * @throws Error if batch was created with partitionKey different than the one provided in the options. - * Create a new producer using the EventHubClient createProducer method. - */ - async send( - eventData: EventData[] | EventDataBatch, - options: SendOptions = {} - ): Promise { - this._throwIfSenderOrConnectionClosed(); - throwTypeErrorIfParameterMissing(this._context.connectionId, "send", "eventData", eventData); - if (Array.isArray(eventData) && eventData.length === 0) { - logger.info(`[${this._context.connectionId}] Empty array was passed. No events to send.`); - return; - } - if (isEventDataBatch(eventData) && eventData.count === 0) { - logger.info(`[${this._context.connectionId}] Empty batch was passsed. No events to send.`); - return; - } - if (!Array.isArray(eventData) && !isEventDataBatch(eventData)) { - eventData = [eventData]; - } - - // link message span contexts - let spanContextsToLink: SpanContext[] = []; - if (Array.isArray(eventData)) { - for (let i = 0; i < eventData.length; i++) { - const event = eventData[i]; - if (!event.properties || !event.properties[TRACEPARENT_PROPERTY]) { - const messageSpan = createMessageSpan(getParentSpan(options.tracingOptions)); - // since these message spans are created from same context as the send span, - // these message spans don't need to be linked. - // replace the original event with the instrumented one - eventData[i] = instrumentEventData(eventData[i], messageSpan); - messageSpan.end(); - } - } - } else if (isEventDataBatch(eventData)) { - spanContextsToLink = eventData._messageSpanContexts; - } - - const sendSpan = this._createSendSpan( - getParentSpan(options.tracingOptions), - spanContextsToLink - ); - - try { - const result = await this._eventHubSender!.send(eventData, { - ...this._senderOptions, - ...options - }); - sendSpan.setStatus({ code: CanonicalCode.OK }); - return result; - } catch (err) { - sendSpan.setStatus({ - code: CanonicalCode.UNKNOWN, - message: err.message - }); - throw err; - } finally { - sendSpan.end(); - } - } - - /** - * Closes the underlying AMQP sender link. - * Once closed, the producer cannot be used for any further operations. - * Use the `createProducer` function on the EventHubClient to instantiate a new EventHubProducer. - * - * @returns - * @throws Error if the underlying connection encounters an error while closing. - */ - async close(): Promise { - try { - if (this._context.connection && this._context.connection.isOpen() && this._eventHubSender) { - await this._eventHubSender.close(); - this._eventHubSender = undefined; - } - this._isClosed = true; - } catch (err) { - logger.warning( - "[%s] An error occurred while closing the Sender for %s: %O", - this._context.connectionId, - this._context.config.entityPath, - err - ); - logErrorStackTrace(err); - throw err; - } - } - - private _createSendSpan( - parentSpan?: Span | SpanContext | null, - spanContextsToLink: SpanContext[] = [] - ): Span { - const links: Link[] = spanContextsToLink.map((context) => { - return { - context - }; - }); - const tracer = getTracer(); - const span = tracer.startSpan("Azure.EventHubs.send", { - kind: SpanKind.CLIENT, - parent: parentSpan, - links - }); - - span.setAttribute("az.namespace", "Microsoft.EventHub"); - span.setAttribute("message_bus.destination", this._eventHubName); - span.setAttribute("peer.address", this._fullyQualifiedNamespace); - - return span; - } - - private _throwIfSenderOrConnectionClosed(): void { - throwErrorIfConnectionClosed(this._context); - if (this.isClosed) { - const errorMessage = - `The EventHubProducer for "${this._context.config.entityPath}" has been closed and can no longer be used. ` + - `Please create a new EventHubProducer using the "createProducer" function on the EventHubClient.`; - const error = new Error(errorMessage); - logger.warning(`[${this._context.connectionId}] %O`, error); - logErrorStackTrace(error); - throw error; - } - } -} diff --git a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts index b46d69cbea32..6c73c86cf384 100644 --- a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts +++ b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts @@ -133,7 +133,7 @@ describe("disconnected", function() { const originalConnectionId = clientConnectionContext.connectionId; // We need to dig deep into the internals to get the awaitable sender so that . - const awaitableSender = client["_producersMap"].get("")!["_eventHubSender"]!["_sender"]!; + const awaitableSender = client["_sendersMap"].get("")!["_sender"]!; let thirdSend: Promise; // Change the timeout on the awaitableSender so it forces an OperationTimeoutError diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 9f584ad01a38..90dc0ef3785e 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -62,10 +62,9 @@ describe("EventHub Sender", function(): void { debug("Closing the clients.."); await producerClient.close(); await consumerClient.close(); - }) + }); describe("Create batch", function(): void { - describe("tryAdd", function() { it("doesn't grow if invalid events are added", async () => { const batch = await producerClient.createBatch({ maxSizeInBytes: 20 }); @@ -84,6 +83,41 @@ describe("EventHub Sender", function(): void { }); }); + it("partitionId is set as expected", async () => { + const batch = await producerClient.createBatch({ + partitionId: "0" + }); + should.equal(batch.partitionId, "0"); + }); + + it("partitionId is set as expected when it is 0 i.e. falsy", async () => { + const batch = await producerClient.createBatch({ + //@ts-expect-error + partitionId: 0 + }); + should.equal(batch.partitionId, "0"); + }); + + it("partitionKey is set as expected", async () => { + const batch = await producerClient.createBatch({ + partitionKey: "boo" + }); + should.equal(batch.partitionKey, "boo"); + }); + + it("partitionKey is set as expected when it is 0 i.e. falsy", async () => { + const batch = await producerClient.createBatch({ + //@ts-expect-error + partitionKey: 0 + }); + should.equal(batch.partitionKey, "0"); + }); + + it("maxSizeInBytes is set as expected", async () => { + const batch = await producerClient.createBatch({ maxSizeInBytes: 30 }); + should.equal(batch.maxSizeInBytes, 30); + }) + it("should be sent successfully", async function(): Promise { const list = ["Albert", `${Buffer.from("Mike".repeat(1300000))}`, "Marie"]; @@ -125,6 +159,88 @@ describe("EventHub Sender", function(): void { ); }); + it("should be sent successfully when partitionId is 0 i.e. falsy", async function(): Promise< + void + > { + const list = ["Albert", "Marie"]; + + const batch = await producerClient.createBatch({ + //@ts-expect-error + partitionId: 0 + }); + + batch.partitionId!.should.equal("0"); + should.not.exist(batch.partitionKey); + batch.maxSizeInBytes.should.be.gt(0); + + batch.tryAdd({ body: list[0] }).should.be.ok; + batch.tryAdd({ body: list[1] }).should.be.ok; + + const { + subscriptionEventHandler, + startPosition + } = await SubscriptionHandlerForTests.startingFromHere(producerClient); + + const subscriber = consumerClient.subscribe("0", subscriptionEventHandler, { + startPosition + }); + await producerClient.sendBatch(batch); + + let receivedEvents; + + try { + receivedEvents = await subscriptionEventHandler.waitForEvents(["0"], 2); + } finally { + await subscriber.close(); + } + + list.should.be.deep.eq( + receivedEvents.map((event) => event.body), + "Received messages should be equal to our sent messages" + ); + }); + + it("should be sent successfully when partitionKey is 0 i.e. falsy", async function(): Promise< + void + > { + const list = ["Albert", "Marie"]; + + const batch = await producerClient.createBatch({ + //@ts-expect-error + partitionKey: 0 + }); + + batch.partitionKey!.should.equal("0"); + should.not.exist(batch.partitionId); + batch.maxSizeInBytes.should.be.gt(0); + + batch.tryAdd({ body: list[0] }).should.be.ok; + batch.tryAdd({ body: list[1] }).should.be.ok; + + const { + subscriptionEventHandler, + startPosition + } = await SubscriptionHandlerForTests.startingFromHere(producerClient); + + const subscriber = consumerClient.subscribe(subscriptionEventHandler, { + startPosition + }); + await producerClient.sendBatch(batch); + + let receivedEvents; + const allPartitionIds = await producerClient.getPartitionIds(); + try { + receivedEvents = await subscriptionEventHandler.waitForEvents(allPartitionIds, 2); + } finally { + await subscriber.close(); + } + + list.should.be.deep.eq( + receivedEvents.map((event) => event.body), + "Received messages should be equal to our sent messages" + ); + }); + it("should be sent successfully with properties", async function(): Promise { const properties = { test: "super" }; const list = [ @@ -361,26 +477,6 @@ describe("EventHub Sender", function(): void { eventDataBatch.count.should.equal(1); }); - it("should throw when maxMessageSize is greater than maximum message size on the AMQP sender link", async function(): Promise< - void - > { - const newClient: EventHubProducerClient = new EventHubProducerClient( - service.connectionString, - service.path - ); - - try { - await newClient.createBatch({ maxSizeInBytes: 2046528 }); - throw new Error("Test Failure"); - } catch (err) { - err.message.should.match( - /.*Max message size \((\d+) bytes\) is greater than maximum message size \((\d+) bytes\) on the AMQP sender link.*/gi - ); - } finally { - await newClient.close(); - } - }); - // TODO: Enable this test https://github.com/Azure/azure-sdk-for-js/issues/9202 is fixed it.skip("should support being cancelled", async function(): Promise { try { @@ -409,8 +505,7 @@ describe("EventHub Sender", function(): void { }); }); - describe("Multiple messages", function(): void { - + describe("Multiple sendBatch calls", function(): void { it("should be sent successfully in parallel", async function(): Promise { const { subscriptionEventHandler, @@ -419,7 +514,7 @@ describe("EventHub Sender", function(): void { const promises = []; for (let i = 0; i < 5; i++) { - promises.push(producerClient.sendBatch([{body: `Hello World ${i}`}])); + promises.push(producerClient.sendBatch([{ body: `Hello World ${i}` }])); } await Promise.all(promises); @@ -457,7 +552,7 @@ describe("EventHub Sender", function(): void { try { const promises = []; for (let i = 0; i < senderCount; i++) { - promises.push(producerClient.sendBatch([{body: `Hello World ${i}`}])); + promises.push(producerClient.sendBatch([{ body: `Hello World ${i}` }])); } await Promise.all(promises); } catch (err) { @@ -466,7 +561,7 @@ describe("EventHub Sender", function(): void { } }); - it("should be sent successfully in parallel by multiple senders", async function(): Promise< + it("should be sent successfully in parallel by multiple clients", async function(): Promise< void > { const senderCount = 3; @@ -475,13 +570,17 @@ describe("EventHub Sender", function(): void { for (let i = 0; i < senderCount; i++) { if (i === 0) { debug(">>>>> Sending a message to partition %d", i); - promises.push(await producerClient.sendBatch([{body: `Hello World ${i}`}], {partitionId: "0"})); + promises.push( + await producerClient.sendBatch([{ body: `Hello World ${i}` }], { partitionId: "0" }) + ); } else if (i === 1) { debug(">>>>> Sending a message to partition %d", i); - promises.push(await producerClient.sendBatch([{body: `Hello World ${i}`}], {partitionId: "1"})); + promises.push( + await producerClient.sendBatch([{ body: `Hello World ${i}` }], { partitionId: "1" }) + ); } else { debug(">>>>> Sending a message to the hub when i == %d", i); - promises.push(await producerClient.sendBatch([{body: `Hello World ${i}`}])); + promises.push(await producerClient.sendBatch([{ body: `Hello World ${i}` }])); } } await Promise.all(promises); @@ -638,7 +737,6 @@ describe("EventHub Sender", function(): void { }); describe("Array of events", function() { - it("should be sent successfully", async () => { const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; const receivedEvents: ReceivedEventData[] = []; @@ -920,132 +1018,218 @@ describe("EventHub Sender", function(): void { }); describe("Validation", function() { - describe("sendBatch", function() { - describe("with EventDataBatch", function() { - it("works if partitionKeys match", async () => { - const misconfiguredOptions: SendBatchOptions = { - partitionKey: "foo" - }; - const batch = await producerClient.createBatch({ partitionKey: "foo" }); - await producerClient.sendBatch(batch, misconfiguredOptions); - }); - it("works if partitionIds match", async () => { - const misconfiguredOptions: SendBatchOptions = { - partitionId: "0" - }; - const batch = await producerClient.createBatch({ partitionId: "0" }); - await producerClient.sendBatch(batch, misconfiguredOptions); - }); - it("throws an error if partitionKeys don't match", async () => { - const badOptions: SendBatchOptions = { - partitionKey: "bar" - }; - const batch = await producerClient.createBatch({ partitionKey: "foo" }); - try { - await producerClient.sendBatch(batch, badOptions); - throw new Error("Test failure"); - } catch (err) { - err.message.should.equal( - "The partitionKey (bar) set on sendBatch does not match the partitionKey (foo) set when creating the batch." - ); - } - }); - it("throws an error if partitionKeys don't match (undefined)", async () => { - const badOptions: SendBatchOptions = { - partitionKey: "bar" - }; - const batch = await producerClient.createBatch(); - try { - await producerClient.sendBatch(batch, badOptions); - throw new Error("Test failure"); - } catch (err) { - err.message.should.equal( - "The partitionKey (bar) set on sendBatch does not match the partitionKey (undefined) set when creating the batch." - ); - } - }); - it("throws an error if partitionIds don't match", async () => { - const badOptions: SendBatchOptions = { - partitionId: "0" - }; - const batch = await producerClient.createBatch({ partitionId: "1" }); - try { - await producerClient.sendBatch(batch, badOptions); - throw new Error("Test failure"); - } catch (err) { - err.message.should.equal( - "The partitionId (0) set on sendBatch does not match the partitionId (1) set when creating the batch." - ); - } - }); - it("throws an error if partitionIds don't match (undefined)", async () => { - const badOptions: SendBatchOptions = { - partitionId: "0" - }; - const batch = await producerClient.createBatch(); - try { - await producerClient.sendBatch(batch, badOptions); - throw new Error("Test failure"); - } catch (err) { - err.message.should.equal( - "The partitionId (0) set on sendBatch does not match the partitionId (undefined) set when creating the batch." - ); - } - }); - it("throws an error if partitionId and partitionKey are set (create, send)", async () => { - const badOptions: SendBatchOptions = { - partitionKey: "foo" - }; - const batch = await producerClient.createBatch({ partitionId: "0" }); - try { - await producerClient.sendBatch(batch, badOptions); - throw new Error("Test failure"); - } catch (err) { - err.message.should.not.equal("Test failure"); - } - }); - it("throws an error if partitionId and partitionKey are set (send, create)", async () => { - const badOptions: SendBatchOptions = { - partitionId: "0" - }; - const batch = await producerClient.createBatch({ partitionKey: "foo" }); - try { - await producerClient.sendBatch(batch, badOptions); - throw new Error("Test failure"); - } catch (err) { - err.message.should.not.equal("Test failure"); - } - }); - it("throws an error if partitionId and partitionKey are set (send, send)", async () => { - const badOptions: SendBatchOptions = { - partitionKey: "foo", - partitionId: "0" - }; - const batch = await producerClient.createBatch(); - try { - await producerClient.sendBatch(batch, badOptions); - throw new Error("Test failure"); - } catch (err) { - err.message.should.not.equal("Test failure"); - } - }); + describe("createBatch", function() { + it("throws an error if partitionId and partitionKey are set", async () => { + try { + await producerClient.createBatch({ partitionId: "0", partitionKey: "boo" }); + throw new Error("Test failure"); + } catch (error) { + error.message.should.equal( + "partitionId and partitionKey cannot both be set when creating a batch" + ); + } }); - describe("with events array", function() { - it("throws an error if partitionId and partitionKey are set", async () => { - const badOptions: SendBatchOptions = { - partitionKey: "foo", - partitionId: "0" - }; - const batch = [{ body: "Hello 1" }, { body: "Hello 2" }]; - try { - await producerClient.sendBatch(batch, badOptions); - throw new Error("Test failure"); - } catch (err) { - err.message.should.equal( - "The partitionId (0) and partitionKey (foo) cannot both be specified." - ); - } - }); + + it("throws an error if partitionId and partitionKey are set and partitionId is 0 i.e. falsy", async () => { + try { + await producerClient.createBatch({ + //@ts-expect-error + partitionId: 0, + partitionKey: "boo" + }); + throw new Error("Test failure"); + } catch (error) { + error.message.should.equal( + "partitionId and partitionKey cannot both be set when creating a batch" + ); + } + }); + + it("throws an error if partitionId and partitionKey are set and partitionKey is 0 i.e. falsy", async () => { + try { + await producerClient.createBatch({ + partitionId: "1", + //@ts-expect-error + partitionKey: 0 + }); + throw new Error("Test failure"); + } catch (error) { + error.message.should.equal( + "partitionId and partitionKey cannot both be set when creating a batch" + ); + } + }); + + it("should throw when maxMessageSize is greater than maximum message size on the AMQP sender link", async function(): Promise< + void + > { + try { + await producerClient.createBatch({ maxSizeInBytes: 2046528 }); + throw new Error("Test Failure"); + } catch (err) { + err.message.should.match( + /.*Max message size \((\d+) bytes\) is greater than maximum message size \((\d+) bytes\) on the AMQP sender link.*/gi + ); + } + }); + }); + describe("sendBatch with EventDataBatch", function() { + it("works if partitionKeys match", async () => { + const misconfiguredOptions: SendBatchOptions = { + partitionKey: "foo" + }; + const batch = await producerClient.createBatch({ partitionKey: "foo" }); + await producerClient.sendBatch(batch, misconfiguredOptions); + }); + it("works if partitionIds match", async () => { + const misconfiguredOptions: SendBatchOptions = { + partitionId: "0" + }; + const batch = await producerClient.createBatch({ partitionId: "0" }); + await producerClient.sendBatch(batch, misconfiguredOptions); + }); + it("throws an error if partitionKeys don't match", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "bar" + }; + const batch = await producerClient.createBatch({ partitionKey: "foo" }); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionKey (bar) set on sendBatch does not match the partitionKey (foo) set when creating the batch." + ); + } + }); + it("throws an error if partitionKeys don't match (undefined)", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "bar" + }; + const batch = await producerClient.createBatch(); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionKey (bar) set on sendBatch does not match the partitionKey (undefined) set when creating the batch." + ); + } + }); + it("throws an error if partitionIds don't match", async () => { + const badOptions: SendBatchOptions = { + partitionId: "0" + }; + const batch = await producerClient.createBatch({ partitionId: "1" }); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionId (0) set on sendBatch does not match the partitionId (1) set when creating the batch." + ); + } + }); + it("throws an error if partitionIds don't match (undefined)", async () => { + const badOptions: SendBatchOptions = { + partitionId: "0" + }; + const batch = await producerClient.createBatch(); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionId (0) set on sendBatch does not match the partitionId (undefined) set when creating the batch." + ); + } + }); + it("throws an error if partitionId and partitionKey are set (create, send)", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "foo" + }; + const batch = await producerClient.createBatch({ partitionId: "0" }); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.not.equal("Test failure"); + } + }); + it("throws an error if partitionId and partitionKey are set (send, create)", async () => { + const badOptions: SendBatchOptions = { + partitionId: "0" + }; + const batch = await producerClient.createBatch({ partitionKey: "foo" }); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.not.equal("Test failure"); + } + }); + it("throws an error if partitionId and partitionKey are set (send, send)", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "foo", + partitionId: "0" + }; + const batch = await producerClient.createBatch(); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.not.equal("Test failure"); + } + }); + }); + + describe("sendBatch with EventDataBatch with events array", function() { + it("throws an error if partitionId and partitionKey are set", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "foo", + partitionId: "0" + }; + const batch = [{ body: "Hello 1" }, { body: "Hello 2" }]; + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionId (0) and partitionKey (foo) cannot both be specified." + ); + } + }); + it("throws an error if partitionId and partitionKey are set with partitionId set to 0 i.e. falsy", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "foo", + //@ts-expect-error + partitionId: 0 + }; + const batch = [{ body: "Hello 1" }, { body: "Hello 2" }]; + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionId (0) and partitionKey (foo) cannot both be specified." + ); + } + }); + it("throws an error if partitionId and partitionKey are set with partitionKey set to 0 i.e. falsy", async () => { + const badOptions: SendBatchOptions = { + //@ts-expect-error + partitionKey: 0, + partitionId: "0" + }; + const batch = [{ body: "Hello 1" }, { body: "Hello 2" }]; + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionId (0) and partitionKey (0) cannot both be specified." + ); + } }); }); }); @@ -1075,7 +1259,9 @@ describe("EventHub Sender", function(): void { it(`"${id}" should throw an error`, async function(): Promise { try { debug("Created sender and will be sending a message to partition id ...", id); - await producerClient.sendBatch([{ body: "Hello world!" }], { partitionId: id as any }); + await producerClient.sendBatch([{ body: "Hello world!" }], { + partitionId: id as any + }); debug("sent the message."); throw new Error("Test failure"); } catch (err) { @@ -1089,5 +1275,4 @@ describe("EventHub Sender", function(): void { }); }); }); - }).timeout(20000);