From b25f53d1e5ed0c77833220ed7ec598ce67645901 Mon Sep 17 00:00:00 2001 From: Ramya Rao Date: Mon, 22 Jun 2020 18:04:09 -0700 Subject: [PATCH] [Event Hub] Update ConsumerClient, Processor, PumpManager & Pump to not use EventHubClient (#9600) --- .../event-hubs/review/event-hubs.api.md | 4 +- .../event-hubs/src/connectionContext.ts | 77 +- .../event-hubs/src/eventHubConsumerClient.ts | 99 +- .../event-hubs/src/eventHubProducerClient.ts | 3 +- .../event-hubs/src/eventHubReceiver.ts | 2 +- sdk/eventhub/event-hubs/src/eventProcessor.ts | 40 +- .../event-hubs/src/impl/eventHubClient.ts | 317 ----- sdk/eventhub/event-hubs/src/models/private.ts | 54 + sdk/eventhub/event-hubs/src/partitionPump.ts | 18 +- sdk/eventhub/event-hubs/src/pumpManager.ts | 10 +- sdk/eventhub/event-hubs/src/receiver.ts | 142 +-- sdk/eventhub/event-hubs/test/client.spec.ts | 4 +- .../test/eventHubConsumerClient.spec.ts | 8 +- .../event-hubs/test/eventProcessor.spec.ts | 183 ++- .../event-hubs/test/node/disconnects.spec.ts | 4 +- sdk/eventhub/event-hubs/test/receiver.spec.ts | 1032 +++++++---------- 16 files changed, 698 insertions(+), 1299 deletions(-) delete mode 100644 sdk/eventhub/event-hubs/src/impl/eventHubClient.ts diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index e327fa02b18b..0eae6f1289f4 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -92,7 +92,7 @@ export class EventHubConsumerClient { get eventHubName(): string; get fullyQualifiedNamespace(): string; getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise; - getPartitionIds(options?: GetPartitionIdsOptions): Promise; + getPartitionIds(options?: GetPartitionIdsOptions): Promise>; getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise; subscribe(handlers: SubscriptionEventHandlers, options?: SubscribeOptions): Subscription; subscribe(partitionId: string, handlers: SubscriptionEventHandlers, options?: SubscribeOptions): Subscription; @@ -112,7 +112,7 @@ export class EventHubProducerClient { getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise; sendBatch(batch: EventData[], options?: SendBatchOptions): Promise; sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise; -} + } // @public export interface EventHubProperties { diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index f6c61591dc0d..1f581b936bb7 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -15,7 +15,11 @@ import { CreateConnectionContextBaseParameters, EventHubConnectionConfig, SharedKeyCredential, - TokenCredential + TokenCredential, + isTokenCredential, + parseConnectionString, + EventHubConnectionStringModel, + ConnectionConfig } from "@azure/core-amqp"; import { ManagementClient, ManagementClientOptions } from "./managementClient"; import { EventHubClientOptions } from "./models/public"; @@ -426,3 +430,74 @@ export namespace ConnectionContext { return connectionContext; } } + +/** + * Helper method to create a ConnectionContext from the input passed to either + * EventHubProducerClient or EventHubConsumerClient constructors + * + * @ignore + * @internal + */ +export function createConnectionContext( + hostOrConnectionString: string, + eventHubNameOrOptions?: string | EventHubClientOptions, + credentialOrOptions?: TokenCredential | EventHubClientOptions, + options?: EventHubClientOptions +): ConnectionContext { + let connectionString; + let config; + let credential: TokenCredential | SharedKeyCredential; + hostOrConnectionString = String(hostOrConnectionString); + + if (!isTokenCredential(credentialOrOptions)) { + const parsedCS = parseConnectionString(hostOrConnectionString); + if ( + !(parsedCS.EntityPath || (typeof eventHubNameOrOptions === "string" && eventHubNameOrOptions)) + ) { + throw new TypeError( + `Either provide "eventHubName" or the "connectionString": "${hostOrConnectionString}", ` + + `must contain "EntityPath=".` + ); + } + if ( + parsedCS.EntityPath && + typeof eventHubNameOrOptions === "string" && + eventHubNameOrOptions && + parsedCS.EntityPath !== eventHubNameOrOptions + ) { + throw new TypeError( + `The entity path "${parsedCS.EntityPath}" in connectionString: "${hostOrConnectionString}" ` + + `doesn't match with eventHubName: "${eventHubNameOrOptions}".` + ); + } + connectionString = hostOrConnectionString; + if (typeof eventHubNameOrOptions !== "string") { + // connectionstring and/or options were passed to constructor + config = EventHubConnectionConfig.create(connectionString); + options = eventHubNameOrOptions; + } else { + // connectionstring, eventHubName and/or options were passed to constructor + const eventHubName = eventHubNameOrOptions; + config = EventHubConnectionConfig.create(connectionString, eventHubName); + options = credentialOrOptions; + } + // Since connectionstring was passed, create a SharedKeyCredential + credential = new SharedKeyCredential(config.sharedAccessKeyName, config.sharedAccessKey); + } else { + // host, eventHubName, a TokenCredential and/or options were passed to constructor + const eventHubName = eventHubNameOrOptions; + let host = hostOrConnectionString; + credential = credentialOrOptions; + if (!eventHubName) { + throw new TypeError(`"eventHubName" is missing`); + } + + if (!host.endsWith("/")) host += "/"; + connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubName}`; + config = EventHubConnectionConfig.create(connectionString); + } + + ConnectionConfig.validate(config); + + return ConnectionContext.create(config, credential, options); +} diff --git a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts index 5069ef75098e..4189cc64675b 100644 --- a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { EventHubClient } from "./impl/eventHubClient"; +import { ConnectionContext, createConnectionContext } from "./connectionContext"; import { EventHubClientOptions, GetEventHubPropertiesOptions, @@ -51,7 +51,14 @@ const defaultConsumerClientOptions: Required.servicebus.windows.net. */ get fullyQualifiedNamespace(): string { - return this._eventHubClient.fullyQualifiedNamespace; + return this._context.config.host; } /** @@ -239,73 +246,65 @@ export class EventHubConsumerClient { // #3 or 3.1 logger.info("Creating EventHubConsumerClient with TokenCredential."); - let eventHubClientOptions: EventHubClientOptions | undefined; - if (isCheckpointStore(checkpointStoreOrOptions5)) { // 3.1 this._checkpointStore = checkpointStoreOrOptions5; this._userChoseCheckpointStore = true; - eventHubClientOptions = options6; + this._clientOptions = options6 || {}; } else { this._checkpointStore = new InMemoryCheckpointStore(); this._userChoseCheckpointStore = false; - eventHubClientOptions = checkpointStoreOrOptions5; + this._clientOptions = checkpointStoreOrOptions5 || {}; } - this._eventHubClient = new EventHubClient( + this._context = createConnectionContext( connectionStringOrFullyQualifiedNamespace2, checkpointStoreOrEventHubNameOrOptions3 as string, checkpointStoreOrCredentialOrOptions4, - eventHubClientOptions + this._clientOptions ); } else if (typeof checkpointStoreOrEventHubNameOrOptions3 === "string") { // #2 or 2.1 logger.info("Creating EventHubConsumerClient with connection string and event hub name."); - let eventHubClientOptions: EventHubClientOptions | undefined; - if (isCheckpointStore(checkpointStoreOrCredentialOrOptions4)) { // 2.1 this._checkpointStore = checkpointStoreOrCredentialOrOptions4; this._userChoseCheckpointStore = true; - eventHubClientOptions = checkpointStoreOrOptions5 as EventHubClientOptions | undefined; + this._clientOptions = (checkpointStoreOrOptions5 as EventHubClientOptions) || {}; } else { // 2 this._checkpointStore = new InMemoryCheckpointStore(); this._userChoseCheckpointStore = false; - eventHubClientOptions = checkpointStoreOrCredentialOrOptions4; + this._clientOptions = checkpointStoreOrCredentialOrOptions4 || {}; } - this._eventHubClient = new EventHubClient( + this._context = createConnectionContext( connectionStringOrFullyQualifiedNamespace2, checkpointStoreOrEventHubNameOrOptions3, - eventHubClientOptions as EventHubClientOptions + this._clientOptions ); } else { // #1 or 1.1 logger.info("Creating EventHubConsumerClient with connection string."); - let eventHubClientOptions: EventHubClientOptions | undefined; - if (isCheckpointStore(checkpointStoreOrEventHubNameOrOptions3)) { // 1.1 this._checkpointStore = checkpointStoreOrEventHubNameOrOptions3; this._userChoseCheckpointStore = true; - eventHubClientOptions = checkpointStoreOrCredentialOrOptions4 as - | EventHubClientOptions - | undefined; + this._clientOptions = + (checkpointStoreOrCredentialOrOptions4 as EventHubClientOptions) || {}; } else { // 1 this._checkpointStore = new InMemoryCheckpointStore(); this._userChoseCheckpointStore = false; - eventHubClientOptions = checkpointStoreOrEventHubNameOrOptions3 as - | EventHubClientOptions - | undefined; + this._clientOptions = + (checkpointStoreOrEventHubNameOrOptions3 as EventHubClientOptions) || {}; } - this._eventHubClient = new EventHubClient( + this._context = createConnectionContext( connectionStringOrFullyQualifiedNamespace2, - eventHubClientOptions + this._clientOptions ); } } @@ -324,8 +323,8 @@ export class EventHubConsumerClient { return subscription.close(); }) ); - // Close the connection via the client. - return this._eventHubClient.close(); + // Close the connection via the connection context. + return this._context.close(); } /** @@ -336,8 +335,15 @@ export class EventHubConsumerClient { * @throws Error if the underlying connection has been closed, create a new EventHubConsumerClient. * @throws AbortError if the operation is cancelled via the abortSignal. */ - getPartitionIds(options: GetPartitionIdsOptions = {}): Promise { - return this._eventHubClient.getPartitionIds(options); + getPartitionIds(options: GetPartitionIdsOptions = {}): Promise> { + return this._context + .managementSession!.getEventHubProperties({ + ...options, + retryOptions: this._clientOptions.retryOptions + }) + .then((eventHubProperties) => { + return eventHubProperties.partitionIds; + }); } /** @@ -352,7 +358,10 @@ export class EventHubConsumerClient { partitionId: string, options: GetPartitionPropertiesOptions = {} ): Promise { - return this._eventHubClient.getPartitionProperties(partitionId, options); + return this._context.managementSession!.getPartitionProperties(partitionId, { + ...options, + retryOptions: this._clientOptions.retryOptions + }); } /** @@ -363,7 +372,10 @@ export class EventHubConsumerClient { * @throws AbortError if the operation is cancelled via the abortSignal. */ getEventHubProperties(options: GetEventHubPropertiesOptions = {}): Promise { - return this._eventHubClient.getProperties(options); + return this._context.managementSession!.getEventHubProperties({ + ...options, + retryOptions: this._clientOptions.retryOptions + }); } /** @@ -421,9 +433,7 @@ export class EventHubConsumerClient { handlersOrPartitionId1, options )); - } else if ( - isSubscriptionEventHandlers(optionsOrHandlers2) - ) { + } else if (isSubscriptionEventHandlers(optionsOrHandlers2)) { // #2: subscribe overload (read from specific partition IDs), don't coordinate const options = possibleOptions3 as SubscribeOptions | undefined; if (options && options.startPosition) { @@ -471,8 +481,7 @@ export class EventHubConsumerClient { } const eventProcessor = this._createEventProcessor( - this._consumerGroup, - this._eventHubClient, + this._context, subscriptionEventHandlers, this._checkpointStore, { @@ -484,7 +493,8 @@ export class EventHubConsumerClient { : new GreedyPartitionLoadBalancer(), // make it so all the event processors process work with the same overarching owner ID // this allows the EventHubConsumer to unify all the work for any processors that it spawns - ownerId: this._id + ownerId: this._id, + retryOptions: this._clientOptions.retryOptions } ); @@ -511,15 +521,15 @@ export class EventHubConsumerClient { } const eventProcessor = this._createEventProcessor( - this._consumerGroup, - this._eventHubClient, + this._context, eventHandlers, this._checkpointStore, { ...defaultConsumerClientOptions, ...options, processingTarget: partitionId, - ownerLevel: getOwnerLevel(subscribeOptions, this._userChoseCheckpointStore) + ownerLevel: getOwnerLevel(subscribeOptions, this._userChoseCheckpointStore), + retryOptions: this._clientOptions.retryOptions } ); @@ -527,15 +537,14 @@ export class EventHubConsumerClient { } private _createEventProcessor( - consumerGroup: string, - eventHubClient: EventHubClient, + connectionContext: ConnectionContext, subscriptionEventHandlers: SubscriptionEventHandlers, checkpointStore: CheckpointStore, options: FullEventProcessorOptions ) { return new EventProcessor( - consumerGroup, - eventHubClient, + this._consumerGroup, + connectionContext, subscriptionEventHandlers, checkpointStore, options diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index 25a3f5facdcb..69d4fbf280e7 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -4,13 +4,12 @@ import { isTokenCredential, TokenCredential } from "@azure/core-amqp"; import { getTracer } from "@azure/core-tracing"; import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api"; -import { ConnectionContext } from "./connectionContext"; +import { ConnectionContext, createConnectionContext } from "./connectionContext"; import { instrumentEventData, TRACEPARENT_PROPERTY } from "./diagnostics/instrumentEventData"; import { createMessageSpan } from "./diagnostics/messageSpan"; import { EventData } from "./eventData"; import { EventDataBatch, EventDataBatchImpl, isEventDataBatch } from "./eventDataBatch"; import { EventHubSender } from './eventHubSender'; -import { createConnectionContext } from "./impl/eventHubClient"; import { logErrorStackTrace, logger } from './log'; import { EventHubProperties, PartitionProperties } from "./managementClient"; import { diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index 1a48c5b859bd..bd8ef5a002a2 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -12,7 +12,7 @@ import { } from "rhea-promise"; import { Constants, MessagingError, delay, translate } from "@azure/core-amqp"; import { EventDataInternal, ReceivedEventData, fromAmqpMessage } from "./eventData"; -import { EventHubConsumerOptions } from "./impl/eventHubClient"; +import { EventHubConsumerOptions } from "./models/private"; import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import { EventPosition, getEventPositionFilter } from "./eventPosition"; diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index e27a63f6db50..e031696a754e 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -2,7 +2,6 @@ // Licensed under the MIT license. import { v4 as uuid } from "uuid"; -import { EventHubClient } from "./impl/eventHubClient"; import { PumpManager, PumpManagerImpl } from "./pumpManager"; import { AbortController, AbortSignalLike } from "@azure/abort-controller"; import { logErrorStackTrace, logger } from "./log"; @@ -13,6 +12,7 @@ import { EventPosition, isEventPosition, latestEventPosition } from "./eventPosi import { delayWithoutThrow } from "./util/delayWithoutThrow"; import { CommonEventProcessorOptions } from "./models/private"; import { CloseReason } from "./models/public"; +import { ConnectionContext } from "./connectionContext"; /** * An interface representing the details on which instance of a `EventProcessor` owns processing @@ -170,7 +170,6 @@ export interface FullEventProcessorOptions extends CommonEventProcessorOptions { * @ignore */ export class EventProcessor { - private _consumerGroup: string; private _processorOptions: FullEventProcessorOptions; private _pumpManager: PumpManager; private _id: string; @@ -180,6 +179,8 @@ export class EventProcessor { private _processingTarget: PartitionLoadBalancer | string; private _loopIntervalInMs = 10000; private _inactiveTimeLimitInMs = 60000; + private _eventHubName: string; + private _fullyQualifiedNamespace: string; /** * @param consumerGroup The name of the consumer group from which you want to process events. @@ -194,8 +195,8 @@ export class EventProcessor { * passing the data to user code for processing. If not provided, it defaults to 60 seconds. */ constructor( - consumerGroup: string, - private _eventHubClient: EventHubClient, + private _consumerGroup: string, + private _context: ConnectionContext, private _subscriptionEventHandlers: SubscriptionEventHandlers, private _checkpointStore: CheckpointStore, options: FullEventProcessorOptions @@ -208,7 +209,8 @@ export class EventProcessor { logger.verbose(`Starting event processor with autogenerated ID ${this._id}`); } - this._consumerGroup = consumerGroup; + this._eventHubName = this._context.config.entityPath; + this._fullyQualifiedNamespace = this._context.config.host; this._processorOptions = options; this._pumpManager = options.pumpManager || new PumpManagerImpl(this._id, this._processorOptions); @@ -237,9 +239,9 @@ export class EventProcessor { const partitionOwnership: PartitionOwnership = { ownerId: this._id, partitionId: partitionIdToClaim, - fullyQualifiedNamespace: this._eventHubClient.fullyQualifiedNamespace, + fullyQualifiedNamespace: this._fullyQualifiedNamespace, consumerGroup: this._consumerGroup, - eventHubName: this._eventHubClient.eventHubName, + eventHubName: this._eventHubName, etag: previousPartitionOwnership ? previousPartitionOwnership.etag : undefined }; @@ -308,8 +310,8 @@ export class EventProcessor { this._subscriptionEventHandlers, this._checkpointStore, { - fullyQualifiedNamespace: this._eventHubClient.fullyQualifiedNamespace, - eventHubName: this._eventHubClient.eventHubName, + fullyQualifiedNamespace: this._fullyQualifiedNamespace, + eventHubName: this._eventHubName, consumerGroup: this._consumerGroup, partitionId: partitionId, eventProcessorId: this.id @@ -319,7 +321,7 @@ export class EventProcessor { const eventPosition = await this._getStartingPosition(partitionId); await this._pumpManager.createPump( eventPosition, - this._eventHubClient, + this._context, partitionProcessor, abortSignal ); @@ -329,8 +331,8 @@ export class EventProcessor { private async _getStartingPosition(partitionIdToClaim: string): Promise { const availableCheckpoints = await this._checkpointStore.listCheckpoints( - this._eventHubClient.fullyQualifiedNamespace, - this._eventHubClient.eventHubName, + this._fullyQualifiedNamespace, + this._eventHubName, this._consumerGroup ); @@ -404,8 +406,8 @@ export class EventProcessor { const partitionOwnershipMap: Map = new Map(); // Retrieve current partition ownership details from the datastore. const partitionOwnership = await this._checkpointStore.listOwnership( - this._eventHubClient.fullyQualifiedNamespace, - this._eventHubClient.eventHubName, + this._fullyQualifiedNamespace, + this._eventHubName, this._consumerGroup ); @@ -419,7 +421,7 @@ export class EventProcessor { partitionOwnershipMap.set(ownership.partitionId, ownership); } - const partitionIds = await this._eventHubClient.getPartitionIds({ + const { partitionIds } = await this._context.managementSession!.getEventHubProperties({ abortSignal: abortSignal }); @@ -486,8 +488,8 @@ export class EventProcessor { if (this._subscriptionEventHandlers.processError) { try { await this._subscriptionEventHandlers.processError(err, { - fullyQualifiedNamespace: this._eventHubClient.fullyQualifiedNamespace, - eventHubName: this._eventHubClient.eventHubName, + fullyQualifiedNamespace: this._fullyQualifiedNamespace, + eventHubName: this._eventHubName, consumerGroup: this._consumerGroup, partitionId: "", updateCheckpoint: async () => {} @@ -579,8 +581,8 @@ export class EventProcessor { private async abandonPartitionOwnerships() { logger.verbose(`[${this._id}] Abandoning owned partitions`); const allOwnerships = await this._checkpointStore.listOwnership( - this._eventHubClient.fullyQualifiedNamespace, - this._eventHubClient.eventHubName, + this._fullyQualifiedNamespace, + this._eventHubName, this._consumerGroup ); const ourOwnerships = allOwnerships.filter((ownership) => ownership.ownerId === this._id); diff --git a/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts b/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts deleted file mode 100644 index cbae69adeda9..000000000000 --- a/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts +++ /dev/null @@ -1,317 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -import { - ConnectionConfig, - EventHubConnectionConfig, - EventHubConnectionStringModel, - RetryOptions, - SharedKeyCredential, - TokenCredential, - isTokenCredential, - parseConnectionString -} from "@azure/core-amqp"; - -import { ConnectionContext } from "../connectionContext"; -import { EventHubProperties, PartitionProperties } from "../managementClient"; -import { EventPosition } from "../eventPosition"; -import { EventHubConsumer } from "../receiver"; -import { throwErrorIfConnectionClosed } from "../util/error"; -import { - EventHubClientOptions, - GetEventHubPropertiesOptions, - GetPartitionIdsOptions, - GetPartitionPropertiesOptions -} from "../models/public"; - -/** - * The set of options to configure the behavior of an `EventHubConsumer`. - * These can be specified when creating the consumer using the `createConsumer` method. - * - `ownerLevel` : A number indicating that the consumer intends to be an exclusive consumer of events resulting in other - * consumers to fail if their `ownerLevel` is lower or doesn't exist. - * - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events. - * A simple usage can be `{ "maxRetries": 4 }`. - * - * Example usage: - * ```js - * { - * retryOptions: { - * maxRetries: 4 - * }, - * trackLastEnqueuedEventProperties: false - * } - * ``` - * @internal - * @ignore - */ -export interface EventHubConsumerOptions { - /** - * @property - * The owner level associated with an exclusive consumer. - * - * When provided, the owner level indicates that a consumer is intended to be the exclusive receiver of events for the - * requested partition and the associated consumer group. - * When multiple consumers exist for the same partition/consumer group pair, then the ones with lower or no - * `ownerLevel` will get a `ReceiverDisconnectedError` during the next attempted receive operation. - */ - ownerLevel?: number; - /** - * @property - * The retry options used to govern retry attempts when an issue is encountered while receiving events. - * If no value is provided here, the retry options set when creating the `EventHubClient` is used. - */ - retryOptions?: RetryOptions; - /** - * @property - * Indicates whether or not the consumer should request information on the last enqueued event on its - * associated partition, and track that information as events are received. - - * When information about the partition's last enqueued event is being tracked, each event received - * from the Event Hubs service will carry metadata about the partition that it otherwise would not. This results in a small amount of - * additional network bandwidth consumption that is generally a favorable trade-off when considered - * against periodically making requests for partition properties using the Event Hub client. - */ - trackLastEnqueuedEventProperties?: boolean; -} - -/** - * @class - * The client is the main point of interaction with Azure Event Hubs service. - * It offers connection to a specific Event Hub within the Event Hubs namespace along with - * operations for sending event data, receiving events, and inspecting the connected Event Hub. - * - * There are multiple ways to create an `EventHubClient` - * - Use the connection string from the SAS policy created for your Event Hub instance. - * - Use the connection string from the SAS policy created for your Event Hub namespace, - * and the name of the Event Hub instance - * - Use the fully qualified domain name of your Event Hub namespace like `.servicebus.windows.net`, - * and a credentials object. - * - * @internal - * @ignore - */ -export class EventHubClient { - /** - * Describes the amqp connection context for the eventhub client. - */ - private _context: ConnectionContext; - - /** - * The options passed by the user when creating the EventHubClient instance. - */ - private _clientOptions: EventHubClientOptions; - - /** - * The Service Bus endpoint. - * @internal - * @ignore - */ - public readonly endpoint: string; - - /** - * @property - * @readonly - * The name of the Event Hub instance for which this client is created. - */ - get eventHubName(): string { - return this._context.config.entityPath; - } - - /** - * @property - * @readonly - * The fully qualified Event Hubs namespace for which this client is created. This is likely to be similar to - * .servicebus.windows.net. - */ - get fullyQualifiedNamespace(): string { - return this._context.config.host; - } - - constructor(connectionString: string, options?: EventHubClientOptions); - constructor(connectionString: string, eventHubName: string, options?: EventHubClientOptions); - constructor( - host: string, - eventHubName: string, - credential: TokenCredential, - options?: EventHubClientOptions - ); - constructor( - hostOrConnectionString: string, - eventHubNameOrOptions?: string | EventHubClientOptions, - credentialOrOptions?: TokenCredential | EventHubClientOptions, - options?: EventHubClientOptions - ) { - this._context = createConnectionContext( - hostOrConnectionString, - eventHubNameOrOptions, - credentialOrOptions, - options - ); - this.endpoint = this._context.config.endpoint; - if (typeof eventHubNameOrOptions !== "string") { - this._clientOptions = eventHubNameOrOptions || {}; - } else if (!isTokenCredential(credentialOrOptions)) { - this._clientOptions = credentialOrOptions || {}; - } else { - this._clientOptions = options || {}; - } - } - - /** - * Closes the AMQP connection to the Event Hub instance, - * returning a promise that will be resolved when disconnection is completed. - * @returns Promise - * @throws Error if the underlying connection encounters an error while closing. - */ - async close(): Promise { - return this._context.close(); - } - - /** - * Creates an Event Hub consumer that can receive events from a specific Event Hub partition, - * in the context of a specific consumer group. - * - * Multiple consumers are allowed on the same partition in a consumer group. - * If there is a need to have an exclusive consumer for a partition in a consumer group, - * then specify the `ownerLevel` in the `options`. - * Exclusive consumers were previously referred to as "Epoch Receivers". - * - * @param consumerGroup The name of the consumer group this consumer is associated with. - * Events are read in the context of this group. You can get this information from Azure portal. - * @param partitionId The identifier of the Event Hub partition from which events will be received. - * You can get identifiers for all partitions by using the `getPartitionProperties` method on the `EventHubClient`. - * @param eventPosition The position within the partition where the consumer should begin reading events. - * @param options The set of options to apply when creating the consumer. - * - `ownerLevel` : A number indicating that the consumer intends to be an exclusive consumer of events resulting in other - * consumers to fail if their `ownerLevel` is lower or doesn't exist. - * - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events. - * A simple usage can be `{ "maxRetries": 4 }`. - * - * @throws Error if the underlying connection has been closed, create a new EventHubClient. - * @throws TypeError if a required parameter is missing. - */ - createConsumer( - consumerGroup: string, - partitionId: string, - eventPosition: EventPosition, - options?: EventHubConsumerOptions - ): EventHubConsumer { - if (!options) { - options = {}; - } - if (!options.retryOptions) { - options.retryOptions = this._clientOptions.retryOptions; - } - throwErrorIfConnectionClosed(this._context); - partitionId = String(partitionId); - return new EventHubConsumer(this._context, consumerGroup, partitionId, eventPosition, options); - } - - /** - * Provides the Event Hub runtime information. - * @param [options] The set of options to apply to the operation call. - * @returns A promise that resolves with EventHubProperties. - * @throws Error if the underlying connection has been closed, create a new EventHubClient. - * @throws AbortError if the operation is cancelled via the abortSignal3. - */ - async getProperties(options: GetEventHubPropertiesOptions = {}): Promise { - return this._context.managementSession!.getEventHubProperties({ - retryOptions: this._clientOptions.retryOptions, - ...options - }); - } - - /** - * Provides an array of partitionIds. - * @param [options] The set of options to apply to the operation call. - * @returns A promise that resolves with an Array of strings. - * @throws Error if the underlying connection has been closed, create a new EventHubClient. - * @throws AbortError if the operation is cancelled via the abortSignal. - */ - async getPartitionIds(options: GetPartitionIdsOptions): Promise> { - const properties = await this.getProperties(options); - return properties.partitionIds; - } - - /** - * Provides information about the specified partition. - * @param partitionId Partition ID for which partition information is required. - * @param [options] The set of options to apply to the operation call. - * @returns A promise that resoloves with PartitionProperties. - * @throws Error if the underlying connection has been closed, create a new EventHubClient. - * @throws AbortError if the operation is cancelled via the abortSignal. - */ - async getPartitionProperties( - partitionId: string, - options: GetPartitionPropertiesOptions = {} - ): Promise { - return this._context.managementSession!.getPartitionProperties(partitionId, { - retryOptions: this._clientOptions.retryOptions, - ...options - }); - } -} - -export function createConnectionContext( - hostOrConnectionString: string, - eventHubNameOrOptions?: string | EventHubClientOptions, - credentialOrOptions?: TokenCredential | EventHubClientOptions, - options?: EventHubClientOptions -): ConnectionContext { - let connectionString; - let config; - let credential: TokenCredential | SharedKeyCredential; - hostOrConnectionString = String(hostOrConnectionString); - - if (!isTokenCredential(credentialOrOptions)) { - const parsedCS = parseConnectionString(hostOrConnectionString); - if ( - !(parsedCS.EntityPath || (typeof eventHubNameOrOptions === "string" && eventHubNameOrOptions)) - ) { - throw new TypeError( - `Either provide "eventHubName" or the "connectionString": "${hostOrConnectionString}", ` + - `must contain "EntityPath=".` - ); - } - if ( - parsedCS.EntityPath && - typeof eventHubNameOrOptions === "string" && - eventHubNameOrOptions && - parsedCS.EntityPath !== eventHubNameOrOptions - ) { - throw new TypeError( - `The entity path "${parsedCS.EntityPath}" in connectionString: "${hostOrConnectionString}" ` + - `doesn't match with eventHubName: "${eventHubNameOrOptions}".` - ); - } - connectionString = hostOrConnectionString; - if (typeof eventHubNameOrOptions !== "string") { - // connectionstring and/or options were passed to constructor - config = EventHubConnectionConfig.create(connectionString); - options = eventHubNameOrOptions; - } else { - // connectionstring, eventHubName and/or options were passed to constructor - const eventHubName = eventHubNameOrOptions; - config = EventHubConnectionConfig.create(connectionString, eventHubName); - options = credentialOrOptions; - } - // Since connectionstring was passed, create a SharedKeyCredential - credential = new SharedKeyCredential(config.sharedAccessKeyName, config.sharedAccessKey); - } else { - // host, eventHubName, a TokenCredential and/or options were passed to constructor - const eventHubName = eventHubNameOrOptions; - let host = hostOrConnectionString; - credential = credentialOrOptions; - if (!eventHubName) { - throw new TypeError(`"eventHubName" is missing`); - } - - if (!host.endsWith("/")) host += "/"; - connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubName}`; - config = EventHubConnectionConfig.create(connectionString); - } - - ConnectionConfig.validate(config); - - return ConnectionContext.create(config, credential, options); -} diff --git a/sdk/eventhub/event-hubs/src/models/private.ts b/sdk/eventhub/event-hubs/src/models/private.ts index 1e605103ada5..384b6c7d3250 100644 --- a/sdk/eventhub/event-hubs/src/models/private.ts +++ b/sdk/eventhub/event-hubs/src/models/private.ts @@ -73,4 +73,58 @@ export interface CommonEventProcessorOptions // make the 'maxBatchSize', 'maxWa * Setting this value to 0 will cause the default value to be used. */ inactiveTimeLimitInMs?: number; + /** + * Retry Options to be used when receiving events + */ + retryOptions?: RetryOptions; } + +/** + * The set of options to configure the behavior of an `EventHubConsumer`. + * These can be specified when creating the consumer using the `createConsumer` method. + * - `ownerLevel` : A number indicating that the consumer intends to be an exclusive consumer of events resulting in other + * consumers to fail if their `ownerLevel` is lower or doesn't exist. + * - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events. + * A simple usage can be `{ "maxRetries": 4 }`. + * + * Example usage: + * ```js + * { + * retryOptions: { + * maxRetries: 4 + * }, + * trackLastEnqueuedEventProperties: false + * } + * ``` + * @internal + * @ignore + */ +export interface EventHubConsumerOptions { + /** + * @property + * The owner level associated with an exclusive consumer. + * + * When provided, the owner level indicates that a consumer is intended to be the exclusive receiver of events for the + * requested partition and the associated consumer group. + * When multiple consumers exist for the same partition/consumer group pair, then the ones with lower or no + * `ownerLevel` will get a `ReceiverDisconnectedError` during the next attempted receive operation. + */ + ownerLevel?: number; + /** + * @property + * The retry options used to govern retry attempts when an issue is encountered while receiving events. + * If no value is provided here, the retry options set when creating the `EventHubClient` is used. + */ + retryOptions?: RetryOptions; + /** + * @property + * Indicates whether or not the consumer should request information on the last enqueued event on its + * associated partition, and track that information as events are received. + + * When information about the partition's last enqueued event is being tracked, each event received + * from the Event Hubs service will carry metadata about the partition that it otherwise would not. This results in a small amount of + * additional network bandwidth consumption that is generally a favorable trade-off when considered + * against periodically making requests for partition properties using the Event Hub client. + */ + trackLastEnqueuedEventProperties?: boolean; +} \ No newline at end of file diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index 54c434c78e80..197502d71158 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -4,7 +4,6 @@ import { logErrorStackTrace, logger } from "./log"; import { CommonEventProcessorOptions } from "./models/private"; import { CloseReason } from "./models/public"; -import { EventHubClient } from "./impl/eventHubClient"; import { EventPosition } from "./eventPosition"; import { PartitionProcessor } from "./partitionProcessor"; import { EventHubConsumer } from "./receiver"; @@ -15,27 +14,25 @@ import { getTracer } from "@azure/core-tracing"; import { CanonicalCode, Link, Span, SpanKind } from "@opentelemetry/api"; import { extractSpanContextFromEventData } from "./diagnostics/instrumentEventData"; import { ReceivedEventData } from "./eventData"; +import { ConnectionContext } from "./connectionContext"; /** * @ignore * @internal */ export class PartitionPump { - private _eventHubClient: EventHubClient; private _partitionProcessor: PartitionProcessor; private _processorOptions: CommonEventProcessorOptions; private _receiver: EventHubConsumer | undefined; private _isReceiving: boolean = false; private _isStopped: boolean = false; private _abortController: AbortController; - constructor( - eventHubClient: EventHubClient, + private _context: ConnectionContext, partitionProcessor: PartitionProcessor, private readonly _startPosition: EventPosition, options: CommonEventProcessorOptions ) { - this._eventHubClient = eventHubClient; this._partitionProcessor = partitionProcessor; this._processorOptions = options; this._abortController = new AbortController(); @@ -63,13 +60,15 @@ export class PartitionPump { } private async _receiveEvents(partitionId: string): Promise { - this._receiver = this._eventHubClient.createConsumer( + this._receiver = new EventHubConsumer( + this._context, this._partitionProcessor.consumerGroup, partitionId, this._startPosition, { ownerLevel: this._processorOptions.ownerLevel, - trackLastEnqueuedEventProperties: this._processorOptions.trackLastEnqueuedEventProperties + trackLastEnqueuedEventProperties: this._processorOptions.trackLastEnqueuedEventProperties, + retryOptions: this._processorOptions.retryOptions } ); @@ -94,7 +93,10 @@ export class PartitionPump { const span = createProcessingSpan( receivedEvents, - this._eventHubClient, + { + eventHubName: this._context.config.entityPath, + endpoint: this._context.config.endpoint + }, this._processorOptions ); diff --git a/sdk/eventhub/event-hubs/src/pumpManager.ts b/sdk/eventhub/event-hubs/src/pumpManager.ts index 1215989ca806..1471e4a435b4 100644 --- a/sdk/eventhub/event-hubs/src/pumpManager.ts +++ b/sdk/eventhub/event-hubs/src/pumpManager.ts @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { EventHubClient } from "./impl/eventHubClient"; import { EventPosition } from "./eventPosition"; import { CommonEventProcessorOptions } from "./models/private"; import { CloseReason } from "./models/public"; @@ -9,6 +8,7 @@ import { PartitionProcessor } from "./partitionProcessor"; import { PartitionPump } from "./partitionPump"; import { logErrorStackTrace, logger } from "./log"; import { AbortSignalLike } from "@azure/abort-controller"; +import { ConnectionContext } from "./connectionContext"; /** * The PumpManager handles the creation and removal of PartitionPumps. @@ -28,7 +28,7 @@ export interface PumpManager { */ createPump( startPosition: EventPosition, - eventHubClient: EventHubClient, + connectionContext: ConnectionContext, partitionProcessor: PartitionProcessor, abortSignal: AbortSignalLike ): Promise; @@ -96,13 +96,13 @@ export class PumpManagerImpl implements PumpManager { /** * Creates and starts a PartitionPump. * @param startPosition The position in the partition to start reading from. - * @param eventHubClient The EventHubClient to forward to the PartitionPump. + * @param connectionContext The ConnectionContext to forward to the PartitionPump. * @param partitionProcessor The PartitionProcessor to forward to the PartitionPump. * @ignore */ public async createPump( startPosition: EventPosition, - eventHubClient: EventHubClient, + connectionContext: ConnectionContext, partitionProcessor: PartitionProcessor, abortSignal: AbortSignalLike ): Promise { @@ -131,7 +131,7 @@ export class PumpManagerImpl implements PumpManager { logger.verbose(`[${this._eventProcessorName}] [${partitionId}] Creating a new pump.`); const pump = new PartitionPump( - eventHubClient, + connectionContext, partitionProcessor, startPosition, this._options diff --git a/sdk/eventhub/event-hubs/src/receiver.ts b/sdk/eventhub/event-hubs/src/receiver.ts index e2ec3f91c93c..abdda13d96f7 100644 --- a/sdk/eventhub/event-hubs/src/receiver.ts +++ b/sdk/eventhub/event-hubs/src/receiver.ts @@ -3,23 +3,10 @@ import { logErrorStackTrace, logger } from "./log"; import { ConnectionContext } from "./connectionContext"; -import { EventHubConsumerOptions } from "./impl/eventHubClient"; -import { - EventHubReceiver, - LastEnqueuedEventProperties, - OnError, - OnMessage -} from "./eventHubReceiver"; +import { EventHubConsumerOptions } from "./models/private"; +import { EventHubReceiver, LastEnqueuedEventProperties } from "./eventHubReceiver"; import { ReceivedEventData } from "./eventData"; -import { - Constants, - MessagingError, - RetryConfig, - RetryOperationType, - RetryOptions, - retry -} from "@azure/core-amqp"; -import { ReceiveHandler } from "./receiveHandler"; +import { RetryConfig, RetryOperationType, RetryOptions, retry } from "@azure/core-amqp"; import { AbortError, AbortSignalLike } from "@azure/abort-controller"; import { throwErrorIfConnectionClosed } from "./util/error"; import { EventPosition } from "./eventPosition"; @@ -52,19 +39,10 @@ export class EventHubConsumer { * @property Describes the amqp connection context for the QueueClient. */ private _context: ConnectionContext; - /** - * @property The consumer group from which the receiver should receive events from. - */ - private _consumerGroup: string; /** * @property Denotes if close() was called on this receiver */ private _isClosed: boolean = false; - /** - * @property The identifier of the Event Hub partition that this consumer is associated with. - * Events will be read only from this partition. - */ - private _partitionId: string; /** * @property The set of options to configure the behavior of an EventHubConsumer. */ @@ -97,37 +75,6 @@ export class EventHubConsumer { return this._isClosed || this._context.wasConnectionCloseCalled; } - /** - * @property The identifier of the Event Hub partition that this consumer is associated with. - * Events will be read only from this partition. - * @readonly - */ - public get partitionId(): string { - return this._partitionId; - } - - /** - * @property The name of the consumer group that this consumer is associated with. - * Events will be read only in the context of this group. - * @readonly - */ - get consumerGroup(): string { - return this._consumerGroup; - } - - /** - * @property The owner level associated with an exclusive consumer; for a non-exclusive consumer, this value will be null or undefined. - * - * When provided, the owner level indicates that a consumer is intended to be the exclusive receiver of events for the - * requested partition and the associated consumer group. - * When multiple consumers exist for the same partition/consumer group pair, then the ones with lower or no - * `ownerLevel` will get a `ReceiverDisconnectedError` during the next attempted receive operation. - * @readonly - */ - get ownerLevel(): number | undefined { - return this._receiverOptions.ownerLevel; - } - /** * Indicates whether the consumer is currently receiving messages or not. * When this returns true, new `receive()` or `receiveBatch()` calls cannot be made. @@ -151,8 +98,6 @@ export class EventHubConsumer { options?: EventHubConsumerOptions ) { this._context = context; - this._consumerGroup = consumerGroup; - this._partitionId = partitionId; this._lastEnqueuedEventProperties = {}; this._receiverOptions = options || {}; this._retryOptions = this._receiverOptions.retryOptions || {}; @@ -164,85 +109,6 @@ export class EventHubConsumer { options ); } - /** - * Starts receiving events from the service and calls the user provided message handler for each event. - * Returns an object that can be used to query the state of the receiver and to stop receiving events as well. - * - * @param onMessage The message handler to receive event data objects. - * @param onError The error handler for errora that can occur when receiving events. - * @param abortSignal An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation. - * For example, use the @azure/abort-controller to create an `AbortSignal`. - * @returns ReceiveHandler - An object that provides a mechanism to stop receiving more messages. - * @throws AbortError if the operation is cancelled via the abortSignal. - * @throws TypeError if a required parameter is missing. - * @throws Error if the underlying connection or receiver has been closed. - * Create a new EventHubConsumer using the EventHubClient createConsumer method. - * @throws Error if the receiver is already receiving messages. - */ - receive(onMessage: OnMessage, onError: OnError, abortSignal?: AbortSignalLike): ReceiveHandler { - this._throwIfReceiverOrConnectionClosed(); - this._throwIfAlreadyReceiving(); - const baseConsumer = this._baseConsumer!; - - if (typeof onMessage !== "function") { - throw new TypeError("The parameter 'onMessage' must be of type 'function'."); - } - if (typeof onError !== "function") { - throw new TypeError("The parameter 'onError' must be of type 'function'."); - } - - // return immediately if the abortSignal is already aborted. - if (abortSignal && abortSignal.aborted) { - onError(new AbortError("The receive operation has been cancelled by the user.")); - // close this receiver when user triggers a cancellation. - this.close().catch(() => {}); // no-op close error handler - return new ReceiveHandler(baseConsumer); - } - - const wrappedOnError = (error: Error) => { - // ignore retryable errors - if ((error as MessagingError).retryable) { - return; - } - - logger.warning( - "[%s] Since the error is not retryable, we let the user know about it by calling the user's error handler.", - this._context.connectionId - ); - logErrorStackTrace(error); - - if (error.name === "AbortError") { - // close this receiver when user triggers a cancellation. - this.close().catch(() => {}); // no-op close error handler - } - onError(error); - }; - - const onAbort = () => { - if (this._baseConsumer) { - this._baseConsumer.abort(); - } - }; - - baseConsumer.registerHandlers( - onMessage, - wrappedOnError, - Constants.defaultPrefetchCount, - true, - abortSignal, - onAbort - ); - - if ( - this._receiverOptions.trackLastEnqueuedEventProperties && - this._baseConsumer && - this._baseConsumer.runtimeInfo - ) { - this._lastEnqueuedEventProperties = this._baseConsumer.runtimeInfo; - } - - return new ReceiveHandler(baseConsumer); - } /** * Returns a promise that resolves to an array of events received from the service. @@ -459,4 +325,4 @@ export class EventHubConsumer { throw error; } } -} +} \ No newline at end of file diff --git a/sdk/eventhub/event-hubs/test/client.spec.ts b/sdk/eventhub/event-hubs/test/client.spec.ts index de37e5515afc..122f6cecab8a 100644 --- a/sdk/eventhub/event-hubs/test/client.spec.ts +++ b/sdk/eventhub/event-hubs/test/client.spec.ts @@ -457,7 +457,7 @@ describe("EventHubConsumerClient User Agent String", function(): void { env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], env[EnvVarKeys.EVENTHUB_NAME] ); - testUserAgentString(consumerClient["_eventHubClient"]["_context"]); + testUserAgentString(consumerClient["_context"]); await consumerClient.close(); }); @@ -469,7 +469,7 @@ describe("EventHubConsumerClient User Agent String", function(): void { env[EnvVarKeys.EVENTHUB_NAME], { userAgent: customUserAgent } ); - testUserAgentString(consumerClient["_eventHubClient"]["_context"], customUserAgent); + testUserAgentString(consumerClient["_context"], customUserAgent); await consumerClient.close(); }); }); diff --git a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts index f1224903b384..5b990694be87 100644 --- a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts @@ -10,7 +10,6 @@ import { logger, CloseReason } from "../src"; -import { EventHubClient } from "../src/impl/eventHubClient"; import { EventHubConsumerClient, isCheckpointStore } from "../src/eventHubConsumerClient"; import { EnvVarKeys, getEnvVars, loopUntil, getStartingPositionsForTests } from "./utils/testUtils"; import chai from "chai"; @@ -19,6 +18,7 @@ import { LogTester } from "./utils/logHelpers"; import { InMemoryCheckpointStore } from "../src/inMemoryCheckpointStore"; import { EventProcessor, FullEventProcessorOptions } from "../src/eventProcessor"; import { SinonStubbedInstance, createStubInstance } from "sinon"; +import { ConnectionContext } from "../src/connectionContext"; const should = chai.should(); const env = getEnvVars(); @@ -83,15 +83,13 @@ describe("EventHubConsumerClient", () => { }; const fakeEventProcessorConstructor = ( - consumerGroup: string, - eventHubClient: EventHubClient, + connectionContext: ConnectionContext, subscriptionEventHandlers: SubscriptionEventHandlers, checkpointStore: CheckpointStore, options: FullEventProcessorOptions ) => { - consumerGroup.should.equal(EventHubConsumerClient.defaultConsumerGroupName); subscriptionEventHandlers.should.equal(subscriptionHandlers); - (typeof eventHubClient.createConsumer).should.equal("function"); + should.exist(connectionContext.managementSession); isCheckpointStore(checkpointStore).should.be.ok; validateOptions(options); diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index d684bd4ce4d4..6fbe6112787e 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -11,7 +11,6 @@ import { CheckpointStore, CloseReason, EventData, - LastEnqueuedEventProperties, PartitionOwnership, ReceivedEventData, SubscriptionEventHandlers, @@ -20,7 +19,6 @@ import { EventHubConsumerClient, EventHubProducerClient } from "../src"; -import { EventHubClient } from "../src/impl/eventHubClient"; import { EnvVarKeys, getEnvVars, loopUntil } from "./utils/testUtils"; import { Dictionary, generate_uuid } from "rhea-promise"; import { EventProcessor, FullEventProcessorOptions } from "../src/eventProcessor"; @@ -36,7 +34,6 @@ import { import { GreedyPartitionLoadBalancer, PartitionLoadBalancer } from "../src/partitionLoadBalancer"; import { AbortError, AbortSignal } from "@azure/abort-controller"; import { FakeSubscriptionEventHandlers } from "./utils/fakeSubscriptionEventHandlers"; -import sinon from "sinon"; import { isLatestPosition } from "../src/eventPosition"; import { AbortController } from "@azure/abort-controller"; const env = getEnvVars(); @@ -52,8 +49,8 @@ describe("Event Processor", function(): void { connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], path: env[EnvVarKeys.EVENTHUB_NAME] }; - let client: EventHubClient; let producerClient: EventHubProducerClient; + let consumerClient: EventHubConsumerClient; before("validate environment", async function(): Promise { should.exist( @@ -67,19 +64,28 @@ describe("Event Processor", function(): void { }); beforeEach("create the client", function() { - client = new EventHubClient(service.connectionString, service.path, {}); producerClient = new EventHubProducerClient(service.connectionString, service.path); + consumerClient = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString, + service.path + ); }); afterEach("close the connection", async function(): Promise { - await client.close(); await producerClient.close(); }); describe("unit tests", () => { describe("_getStartingPosition", () => { before(() => { - client["getPartitionIds"] = async () => ["0", "1"]; + consumerClient["_context"].managementSession!.getEventHubProperties = async () => { + return Promise.resolve({ + name: "boo", + createdOn: new Date(), + partitionIds: ["0", "1"] + }); + }; }); it("no checkpoint or user specified default", async () => { @@ -156,7 +162,7 @@ describe("Event Processor", function(): void { ) { return new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], { processEvents: async () => {}, processError: async () => {} @@ -214,7 +220,7 @@ describe("Event Processor", function(): void { // it's only here so we can call a few private methods on it. eventProcessor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], { processEvents: async () => {}, processError: async (err, context) => { @@ -293,7 +299,7 @@ describe("Event Processor", function(): void { const eventProcessor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], { processEvents: async () => {}, processError: async () => {} @@ -344,20 +350,25 @@ describe("Event Processor", function(): void { originalClaimedPartitions.sort((a, b) => a.partitionId.localeCompare(b.partitionId)); - const fakeEventHubClient = sinon.createStubInstance(EventHubClient); const partitionIds = ["1001", "1002", "1003"]; - fakeEventHubClient.getPartitionIds.resolves(partitionIds); - sinon.replaceGetter(fakeEventHubClient, "eventHubName", () => commonFields.eventHubName); - sinon.replaceGetter( - fakeEventHubClient, - "fullyQualifiedNamespace", - () => commonFields.fullyQualifiedNamespace - ); + const fakeConnectionContext = { + managementSession: { + getEventHubProperties: async () => { + return { + partitionIds + }; + } + }, + config: { + entityPath: commonFields.eventHubName, + host: commonFields.fullyQualifiedNamespace + } + }; const ep = new EventProcessor( commonFields.consumerGroup, - fakeEventHubClient as any, + fakeConnectionContext as any, handlers, checkpointStore, { @@ -481,7 +492,7 @@ describe("Event Processor", function(): void { const eventProcessor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], { processEvents: async () => {}, processError: async (err, _) => { @@ -525,7 +536,7 @@ describe("Event Processor", function(): void { const eventProcessor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], { processClose: async () => { throw new Error("processClose() error"); @@ -577,7 +588,7 @@ describe("Event Processor", function(): void { it("should expose an id", async function(): Promise { const processor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], { processEvents: async () => {}, processError: async () => {} @@ -596,7 +607,7 @@ describe("Event Processor", function(): void { it("id can be forced to be a specific value", async function(): Promise { const processor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], { processEvents: async () => {}, processError: async () => {} @@ -609,7 +620,7 @@ describe("Event Processor", function(): void { }); it("should treat consecutive start invocations as idempotent", async function(): Promise { - const partitionIds = await client.getPartitionIds({}); + const partitionIds = await producerClient.getPartitionIds(); // ensure we have at least 2 partitions partitionIds.length.should.gte(2); @@ -621,7 +632,7 @@ describe("Event Processor", function(): void { const processor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], subscriptionEventHandler, new InMemoryCheckpointStore(), { @@ -652,7 +663,7 @@ describe("Event Processor", function(): void { const processor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], { processInitialize: async () => { didPartitionProcessorStart = true; @@ -674,7 +685,7 @@ describe("Event Processor", function(): void { }); it("should support start after stopping", async function(): Promise { - const partitionIds = await client.getPartitionIds({}); + const partitionIds = await producerClient.getPartitionIds(); // ensure we have at least 2 partitions partitionIds.length.should.gte(2); @@ -687,7 +698,7 @@ describe("Event Processor", function(): void { const processor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], subscriptionEventHandler, new InMemoryCheckpointStore(), { @@ -732,7 +743,7 @@ describe("Event Processor", function(): void { it("should support processing events across multiple partitions", async function(): Promise< void > { - const partitionIds = await client.getPartitionIds({}); + const partitionIds = await producerClient.getPartitionIds(); const { subscriptionEventHandler, startPosition @@ -740,7 +751,7 @@ describe("Event Processor", function(): void { const processor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], subscriptionEventHandler, new InMemoryCheckpointStore(), { @@ -822,7 +833,7 @@ describe("Event Processor", function(): void { }); it("should receive events from the checkpoint", async function(): Promise { - const partitionIds = await client.getPartitionIds({}); + const partitionIds = await producerClient.getPartitionIds(); // ensure we have at least 2 partitions partitionIds.length.should.gte(2); @@ -864,7 +875,7 @@ describe("Event Processor", function(): void { const inMemoryCheckpointStore = new InMemoryCheckpointStore(); const processor1 = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], new FooPartitionProcessor(), inMemoryCheckpointStore, { @@ -910,15 +921,15 @@ describe("Event Processor", function(): void { const processor2 = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], new FooPartitionProcessor(), inMemoryCheckpointStore, { ...defaultOptions, startPosition: earliestEventPosition } ); const checkpoints = await inMemoryCheckpointStore.listCheckpoints( - client.fullyQualifiedNamespace, - client.eventHubName, + consumerClient.fullyQualifiedNamespace, + consumerClient.eventHubName, EventHubConsumerClient.defaultConsumerGroupName ); @@ -1040,7 +1051,7 @@ describe("Event Processor", function(): void { describe("Load balancing", function(): void { beforeEach("validate partitions", async function(): Promise { - const partitionIds = await client.getPartitionIds({}); + const partitionIds = await producerClient.getPartitionIds(); // ensure we have at least 3 partitions partitionIds.length.should.gte( 3, @@ -1055,7 +1066,7 @@ describe("Event Processor", function(): void { const processorByName: Dictionary = {}; const checkpointStore = new InMemoryCheckpointStore(); - const partitionIds = await client.getPartitionIds({}); + const partitionIds = await producerClient.getPartitionIds(); const partitionOwnershipArr = new Set(); const partitionResultsMap = new Map< @@ -1092,7 +1103,9 @@ describe("Event Processor", function(): void { // create messages const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; for (const partitionId of partitionIds) { - await producerClient.sendBatch([{ body: expectedMessagePrefix + partitionId }], { partitionId }); + await producerClient.sendBatch([{ body: expectedMessagePrefix + partitionId }], { + partitionId + }); } const processor1LoadBalancingInterval = { @@ -1108,7 +1121,7 @@ describe("Event Processor", function(): void { processorByName[`processor-1`] = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], new FooPartitionProcessor(), checkpointStore, { @@ -1130,7 +1143,7 @@ describe("Event Processor", function(): void { processorByName[`processor-2`] = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], new FooPartitionProcessor(), checkpointStore, { @@ -1157,8 +1170,8 @@ describe("Event Processor", function(): void { } const partitionOwnership = await checkpointStore.listOwnership( - client.fullyQualifiedNamespace, - client.eventHubName, + consumerClient.fullyQualifiedNamespace, + consumerClient.eventHubName, EventHubConsumerClient.defaultConsumerGroupName ); @@ -1205,7 +1218,7 @@ describe("Event Processor", function(): void { void > { const processorByName: Dictionary = {}; - const partitionIds = await client.getPartitionIds({}); + const partitionIds = await producerClient.getPartitionIds(); const checkpointStore = new InMemoryCheckpointStore(); const partitionOwnershipArr = new Set(); let didError = false; @@ -1223,14 +1236,18 @@ describe("Event Processor", function(): void { // create messages const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; for (const partitionId of partitionIds) { - await producerClient.sendBatch([{ body: expectedMessagePrefix + partitionId }], { partitionId }); + await producerClient.sendBatch([{ body: expectedMessagePrefix + partitionId }], { + partitionId + }); } for (let i = 0; i < 2; i++) { const processorName = `processor-${i}`; - processorByName[processorName] = new EventProcessor( + processorByName[ + processorName + ] = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], new FooPartitionProcessor(), checkpointStore, { ...defaultOptions, startPosition: earliestEventPosition } @@ -1250,8 +1267,8 @@ describe("Event Processor", function(): void { const partitionOwnershipMap: Map = new Map(); const partitionOwnership = await checkpointStore.listOwnership( - client.fullyQualifiedNamespace, - client.eventHubName, + consumerClient.fullyQualifiedNamespace, + consumerClient.eventHubName, EventHubConsumerClient.defaultConsumerGroupName ); @@ -1279,7 +1296,7 @@ describe("Event Processor", function(): void { it("should ensure that all the processors maintain a steady-state when all partitions are being processed", async function(): Promise< void > { - const partitionIds = await client.getPartitionIds({}); + const partitionIds = await producerClient.getPartitionIds(); const checkpointStore = new InMemoryCheckpointStore(); const claimedPartitionsMap = {} as { [eventProcessorId: string]: Set }; @@ -1337,7 +1354,7 @@ describe("Event Processor", function(): void { const processor1 = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], handlers, checkpointStore, eventProcessorOptions @@ -1345,7 +1362,7 @@ describe("Event Processor", function(): void { const processor2 = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, - client, + consumerClient["_context"], handlers, checkpointStore, eventProcessorOptions @@ -1438,70 +1455,6 @@ describe("Event Processor", function(): void { } }); }); - - describe("with trackLastEnqueuedEventProperties", function(): void { - it("should have lastEnqueuedEventProperties populated when trackLastEnqueuedEventProperties is set to true", async function(): Promise< - void - > { - const { startPosition } = await SubscriptionHandlerForTests.startingFromHere(producerClient); - const partitionIds = await client.getPartitionIds({}); - for (const partitionId of partitionIds) { - await producerClient.sendBatch([{ body: `Hello world - ${partitionId}` }], { partitionId }); - } - - const partitionIdsSet = new Set(); - const lastEnqueuedEventPropertiesMap: Map = new Map(); - class SimpleEventProcessor implements SubscriptionEventHandlers { - async processEvents(_events: ReceivedEventData[], context: PartitionContext) { - partitionIdsSet.add(context.partitionId); - lastEnqueuedEventPropertiesMap.set( - context.partitionId, - context.lastEnqueuedEventProperties! - ); - } - async processError() {} - } - - const processor = new EventProcessor( - EventHubConsumerClient.defaultConsumerGroupName, - client, - new SimpleEventProcessor(), - new InMemoryCheckpointStore(), - { - ...defaultOptions, - trackLastEnqueuedEventProperties: true, - processingTarget: new GreedyPartitionLoadBalancer(), - startPosition - } - ); - - processor.start(); - - while (partitionIdsSet.size !== partitionIds.length) { - await delay(1000); - } - - await processor.stop(); - - for (const partitionId of partitionIds) { - debug("Getting the partition information"); - const partitionInfo = await client.getPartitionProperties(partitionId); - debug("partition info: ", partitionInfo); - - // sanity check - no partition should report being empty since we've sent messages - // to each one - partitionInfo.isEmpty.should.be.false; - - const results = lastEnqueuedEventPropertiesMap.get(partitionId)!; - should.exist(results); - - results!.offset!.should.equal(partitionInfo.lastEnqueuedOffset); - results!.sequenceNumber!.should.equal(partitionInfo.lastEnqueuedSequenceNumber); - results!.enqueuedOn!.getTime().should.equal(partitionInfo.lastEnqueuedOnUtc.getTime()); - results!.retrievedOn!.getTime().should.be.greaterThan(Date.now() - 60000); - } - }); - }); }).timeout(90000); function ownershipListToMap(partitionOwnership: PartitionOwnership[]): Map { diff --git a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts index 6c73c86cf384..827d94ee4228 100644 --- a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts +++ b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts @@ -28,7 +28,7 @@ describe("disconnected", function() { describe("EventHubConsumerClient", function() { it("runtimeInfo work after disconnect", async () => { const client = new EventHubConsumerClient(EventHubConsumerClient.defaultConsumerGroupName, service.connectionString, service.path); - const clientConnectionContext = client["_eventHubClient"]["_context"]; + const clientConnectionContext = client["_context"]; await client.getPartitionIds({}); const originalConnectionId = clientConnectionContext.connectionId; @@ -49,7 +49,7 @@ describe("disconnected", function() { const client = new EventHubConsumerClient(EventHubConsumerClient.defaultConsumerGroupName, service.connectionString, service.path); const partitionId = "0"; const partitionProperties = await client.getPartitionProperties(partitionId); - const clientConnectionContext = client["_eventHubClient"]["_context"]; + const clientConnectionContext = client["_context"]; let subscription: Subscription | undefined; let originalConnectionId: string; diff --git a/sdk/eventhub/event-hubs/test/receiver.spec.ts b/sdk/eventhub/event-hubs/test/receiver.spec.ts index 8380fb80e992..b5f76c70c9be 100644 --- a/sdk/eventhub/event-hubs/test/receiver.spec.ts +++ b/sdk/eventhub/event-hubs/test/receiver.spec.ts @@ -2,7 +2,6 @@ // Licensed under the MIT license. import chai from "chai"; -import { v4 as uuid } from "uuid"; const should = chai.should(); import chaiAsPromised from "chai-as-promised"; chai.use(chaiAsPromised); @@ -18,23 +17,18 @@ import { EventHubProducerClient, Subscription } from "../src"; -import { EventHubClient } from "../src/impl/eventHubClient"; import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; import { AbortController } from "@azure/abort-controller"; import { EventHubConsumer } from "../src/receiver"; -import { ReceiveHandler } from "../src/receiveHandler"; const env = getEnvVars(); -describe("EventHub Receiver", function(): void { +describe("EventHubConsumerClient", function(): void { const service = { connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], path: env[EnvVarKeys.EVENTHUB_NAME] }; - const client = new EventHubClient(service.connectionString, service.path); let producerClient: EventHubProducerClient; let consumerClient: EventHubConsumerClient; - - let receiver: EventHubConsumer | undefined; let partitionIds: string[]; before("validate environment", async function(): Promise { should.exist( @@ -45,12 +39,6 @@ describe("EventHub Receiver", function(): void { env[EnvVarKeys.EVENTHUB_NAME], "define EVENTHUB_NAME in your environment before running integration tests." ); - partitionIds = await client.getPartitionIds({}); - }); - - after("close the connection", async function(): Promise { - await client.close(); - await producerClient.close(); }); beforeEach("Creating the clients", async () => { @@ -60,19 +48,15 @@ describe("EventHub Receiver", function(): void { service.connectionString, service.path ); + partitionIds = await producerClient.getPartitionIds({}); }); afterEach("Closing the clients", async () => { await producerClient.close(); await consumerClient.close(); - if (receiver && !receiver.isClosed) { - await receiver.close(); - debug("After each - Receiver closed."); - } - receiver = undefined; }); - describe("with partitionId 0 as number", function(): void { + describe("subscribe() with partitionId 0 as number", function(): void { it("should not throw an error", async function(): Promise { let subscription: Subscription | undefined; await new Promise((resolve, reject) => { @@ -97,417 +81,384 @@ describe("EventHub Receiver", function(): void { }); }); - describe("with EventPosition specified as", function(): void { - it("'from end of stream' should receive messages correctly", async function(): Promise { - const partitionId = partitionIds[0]; - debug("Creating new receiver with offset EndOfStream"); - const receiver = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - latestEventPosition - ); - const data = await receiver.receiveBatch(10, 10); - data.length.should.equal(0, "Unexpected message received when using EventPosition.fromEnd()"); - const events: EventData[] = []; - for (let i = 0; i < 10; i++) { - const ed: EventData = { - body: "Hello awesome world " + i - }; - events.push(ed); + describe("subscribe() with EventPosition specified as", function(): void { + let partitionId: string; + let eventSentBeforeSubscribe: EventData; + let eventsSentAfterSubscribe: EventData[]; + + beforeEach(async () => { + partitionId = partitionIds[0]; + + eventSentBeforeSubscribe = { + body: "Hello awesome world " + Math.random() + }; + await producerClient.sendBatch([eventSentBeforeSubscribe], { partitionId }); + + eventsSentAfterSubscribe = []; + for (let i = 0; i < 5; i++) { + eventsSentAfterSubscribe.push({ + body: "Hello awesome world " + Math.random(), + properties: { + stamp: Math.random() + } + }); } - await producerClient.sendBatch(events, { partitionId: partitionId }); - debug(">>>>>>> Sent the new messages. We should only receive these messages."); - const data2 = await receiver.receiveBatch(10, 20); - debug("received messages: ", data2); - data2.length.should.equal(10, "Failed to receive the expected nummber of messages"); - debug("Next receive on this partition should not receive any messages."); - const data3 = await receiver.receiveBatch(10, 10); - data3.length.should.equal(0, "Unexpected message received"); - await receiver.close(); }); - it("'from last enqueued sequence number' should receive messages correctly", async function(): Promise< - void - > { - const partitionId = partitionIds[0]; - const partitionInfo = await client.getPartitionProperties(partitionId); - debug("Creating a receiver with last enqueued sequence number"); - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - sequenceNumber: partitionInfo.lastEnqueuedSequenceNumber + it("'from end of stream' should receive messages correctly", async function(): Promise { + let subscription: Subscription | undefined; + let processEventsCalled = false; + const eventsReceived: ReceivedEventData[] = []; + + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + if (!processEventsCalled) { + processEventsCalled = true; + should.equal(data.length, 0, "Received events when none were sent yet."); + await producerClient.sendBatch(eventsSentAfterSubscribe, { partitionId }); + return; + } + eventsReceived.push(...data); + if (eventsReceived.length === eventsSentAfterSubscribe.length) { + resolve(); + } + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: latestEventPosition, + maxWaitTimeInSeconds: 30 + } + ); }); - const data = await receiver.receiveBatch(10, 10); - data.length.should.equal( - 0, - "Unexpected message received when using EventPosition with sequenceNumber" - ); - const events: EventData[] = []; - for (let i = 0; i < 10; i++) { - const ed: EventData = { - body: "Hello awesome world " + i - }; - events.push(ed); + await subscription!.close(); + + if (eventsReceived.find((event) => event.body === eventSentBeforeSubscribe.body)) { + should.fail("Received event sent before subscribe call with latestEventPosition."); } - await producerClient.sendBatch(events, { partitionId: partitionId }); - debug(">>>>>>> Sent 10 messages. We should only receive these 10 messages."); - const data2 = await receiver.receiveBatch(10, 20); - debug("received messages: ", data2); - data2.length.should.equal(10, "Failed to receive the expected nummber of messages"); - await receiver.close(); - }); - it("'after a particular offset' should receive messages correctly", async function(): Promise< - void - > { - const partitionId = partitionIds[0]; - const pInfo = await client.getPartitionProperties(partitionId); - debug(`Creating new receiver with last enqueued offset: "${pInfo.lastEnqueuedOffset}".`); - debug("Establishing the receiver link..."); - // send a new message. We should only receive this new message. - const uid = uuid(); - const ed: EventData = { - body: "New message after last enqueued offset", - properties: { - stamp: uid - } - }; - await producerClient.sendBatch([ed], { partitionId: partitionId }); - debug( - "Sent the new message after creating the receiver. We should only receive this message." + should.equal( + eventsReceived.length, + eventsSentAfterSubscribe.length, + "Not received the same number of events that were sent." ); - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - offset: pInfo.lastEnqueuedOffset - }); - const data = await receiver.receiveBatch(10, 20); - debug("received messages: ", data); - data.length.should.equal(1); - data[0].properties!.stamp.should.equal(uid); + for (let i = 0; i < eventsSentAfterSubscribe.length; i++) { + eventsReceived[i].body.should.equal(eventsSentAfterSubscribe[i].body); + eventsReceived[i].properties!.stamp.should.equal( + eventsSentAfterSubscribe[i].properties!.stamp + ); + } }); - it("'from a particular enqueued time' should receive messages correctly", async function(): Promise< + it("'after a particular sequence number' should receive messages correctly", async function(): Promise< void > { - const partitionId = partitionIds[0]; - const pInfo = await client.getPartitionProperties(partitionId); - debug(`Creating new receiver with last enqueued time: "${pInfo.lastEnqueuedOnUtc}".`); - debug("Establishing the receiver link..."); - - // send a new message. We should only receive this new message. - const uid = uuid(); - const ed: EventData = { - body: "New message after last enqueued time " + pInfo.lastEnqueuedOnUtc, - properties: { - stamp: uid - } - }; - await producerClient.sendBatch([ed], { partitionId: partitionId }); - debug( - "Sent the new message after creating the receiver. We should only receive this message." - ); + const partitionInfo = await consumerClient.getPartitionProperties(partitionId); + let subscription: Subscription | undefined; + let processEventsCalled = false; + const eventsReceived: ReceivedEventData[] = []; - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: pInfo.lastEnqueuedOnUtc + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + if (!processEventsCalled) { + processEventsCalled = true; + should.equal(data.length, 0, "Received events when none were sent yet."); + await producerClient.sendBatch(eventsSentAfterSubscribe, { partitionId }); + return; + } + eventsReceived.push(...data); + if (eventsReceived.length === eventsSentAfterSubscribe.length) { + resolve(); + } + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { sequenceNumber: partitionInfo.lastEnqueuedSequenceNumber }, + maxWaitTimeInSeconds: 30 + } + ); }); - const data = await receiver.receiveBatch(10, 20); - debug("received messages: ", data); - data.length.should.equal(1, "Failed to received the expected single message"); - data[0].properties!.stamp.should.equal(uid); - }); + await subscription!.close(); - it("'after the particular sequence number' should receive messages correctly", async function(): Promise< - void - > { - const partitionId = partitionIds[0]; - const pInfo = await client.getPartitionProperties(partitionId); - // send a new message. We should only receive this new message. - const uid = uuid(); - const ed: EventData = { - body: "New message after last enqueued sequence number " + pInfo.lastEnqueuedSequenceNumber, - properties: { - stamp: uid - } - }; - await producerClient.sendBatch([ed], { partitionId: partitionId }); - debug( - "Sent the new message after getting the partition runtime information. We should only receive this message." - ); - debug( - `Creating new receiver with last enqueued sequence number: "${pInfo.lastEnqueuedSequenceNumber}".` + if (eventsReceived.find((event) => event.body === eventSentBeforeSubscribe.body)) { + should.fail("Received event sent before subscribe call with last sequence number."); + } + + should.equal( + eventsReceived.length, + eventsSentAfterSubscribe.length, + "Not received the same number of events that were sent." ); - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - sequenceNumber: pInfo.lastEnqueuedSequenceNumber - }); - const data = await receiver.receiveBatch(1, 20); - debug("received messages: ", data); - data.length.should.equal(1, "Failed to receive the expected single message"); - data[0].properties!.stamp.should.equal(uid, "Received message has unexpected uid"); - debug("Next receive on this partition should not receive any messages."); - const data2 = await receiver.receiveBatch(1, 1); - data2.length.should.equal(0, "Unexpected message received"); + for (let i = 0; i < eventsSentAfterSubscribe.length; i++) { + eventsReceived[i].body.should.equal(eventsSentAfterSubscribe[i].body); + eventsReceived[i].properties!.stamp.should.equal( + eventsSentAfterSubscribe[i].properties!.stamp + ); + } }); - it("'after the particular sequence number' with isInclusive true should receive messages correctly", async function(): Promise< + it("'after a particular sequence number' with isInclusive should receive messages correctly", async function(): Promise< void > { - const partitionId = partitionIds[0]; - const uid = uuid(); - const ed: EventData = { - body: "New message before getting the last sequence number", - properties: { - stamp: uid - } - }; - await producerClient.sendBatch([ed], { partitionId: partitionId }); - debug(`Sent message 1 with stamp: ${uid}.`); - const pInfo = await client.getPartitionProperties(partitionId); - const uid2 = uuid(); - const ed2: EventData = { - body: "New message after the last enqueued offset", - properties: { - stamp: uid2 - } - }; - await producerClient.sendBatch([ed2], { partitionId: partitionId }); - debug(`Sent message 2 with stamp: ${uid}.`); - debug( - `Creating new receiver with last sequence number: "${pInfo.lastEnqueuedSequenceNumber}".` - ); - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - sequenceNumber: pInfo.lastEnqueuedSequenceNumber, - isInclusive: true - }); - debug("We should receive the last 2 messages."); - const data = await receiver.receiveBatch(2, 30); - debug("received messages: ", data); - data.length.should.equal(2, "Failed to received two expected messages"); - data[0].properties!.stamp.should.equal(uid, "Message 1 has unexpected uid"); - data[1].properties!.stamp.should.equal(uid2, "Message 2 has unexpected uid"); - debug("Next receive on this partition should not receive any messages."); - const data2 = await receiver.receiveBatch(1, 1); - data2.length.should.equal(0, "Unexpected message received"); - }); - }); - - describe("in streaming mode", function(): void { - it("should receive messages correctly", async function(): Promise { - const partitionId = partitionIds[0]; - const pInfo = await client.getPartitionProperties(partitionId); - - // send a message that can be received - await producerClient.sendBatch([{ body: "receive behaves correctly" }], { partitionId }); + const partitionInfo = await consumerClient.getPartitionProperties(partitionId); + let subscription: Subscription | undefined; + let processEventsCalled = false; + const eventsReceived: ReceivedEventData[] = []; - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - sequenceNumber: pInfo.lastEnqueuedSequenceNumber - }); + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + if (!processEventsCalled) { + processEventsCalled = true; + should.equal(data.length, 1, "Expected 1 event sent right before subscribe call."); + should.equal( + data[0].body, + eventSentBeforeSubscribe.body, + "Should have received only the 1 event sent right before subscribe call." + ); + + await producerClient.sendBatch(eventsSentAfterSubscribe, { partitionId }); + return; + } - const received: ReceivedEventData[] = await new Promise((resolve, reject) => { - let shouldStop = false; - const events: ReceivedEventData[] = []; - - const handler = receiver!.receive((event) => { - if (!shouldStop) { - events.push(event); - shouldStop = true; - handler - .stop() - .then(() => resolve(events)) - .catch(reject); + eventsReceived.push(...data); + if (eventsReceived.length === eventsSentAfterSubscribe.length) { + resolve(); + } + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { + sequenceNumber: partitionInfo.lastEnqueuedSequenceNumber, + isInclusive: true + }, + maxWaitTimeInSeconds: 30 } - }, reject); + ); }); + await subscription!.close(); - received.length.should.equal(1); - }); - - it("should support being cancelled", async function(): Promise { - const partitionId = partitionIds[0]; - const time = Date.now(); + should.equal( + eventsReceived.length, + eventsSentAfterSubscribe.length, + "Not received the same number of events that were sent." + ); - // send a message that can be received - await producerClient.sendBatch([{ body: "receive cancellation - timeout 0" }], { partitionId }); + for (let i = 0; i < eventsSentAfterSubscribe.length; i++) { + eventsReceived[i].body.should.equal(eventsSentAfterSubscribe[i].body); + eventsReceived[i].properties!.stamp.should.equal( + eventsSentAfterSubscribe[i].properties!.stamp + ); + } + }); - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: time - }); + it("'after a particular offset' should receive messages correctly", async function(): Promise< + void + > { + const partitionInfo = await consumerClient.getPartitionProperties(partitionId); + let subscription: Subscription | undefined; + let processEventsCalled = false; + const eventsReceived: ReceivedEventData[] = []; - try { - await new Promise((resolve, reject) => { - let shouldStop = false; - const events: ReceivedEventData[] = []; - // abortSignal event listeners will be triggered after synchronous paths are executed - const abortSignal = AbortController.timeout(0); - - const handler = receiver!.receive( - (event) => { - if (!shouldStop) { - events.push(event); - shouldStop = true; - handler - .stop() - .then(() => resolve(events)) - .catch(reject); + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + if (!processEventsCalled) { + processEventsCalled = true; + should.equal(data.length, 0, "Received events when none were sent yet."); + await producerClient.sendBatch(eventsSentAfterSubscribe, { partitionId }); + return; + } + eventsReceived.push(...data); + if (eventsReceived.length === eventsSentAfterSubscribe.length) { + resolve(); } }, - reject, - abortSignal - ); - }); - throw new Error(`Test failure`); - } catch (err) { - err.name.should.equal("AbortError"); - err.message.should.equal("The receive operation has been cancelled by the user."); + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { offset: partitionInfo.lastEnqueuedOffset }, + maxWaitTimeInSeconds: 30 + } + ); + }); + await subscription!.close(); + + if (eventsReceived.find((event) => event.body === eventSentBeforeSubscribe.body)) { + should.fail("Received event sent before subscribe call with last offset."); + } + + should.equal( + eventsReceived.length, + eventsSentAfterSubscribe.length, + "Not received the same number of events that were sent." + ); + for (let i = 0; i < eventsSentAfterSubscribe.length; i++) { + eventsReceived[i].body.should.equal(eventsSentAfterSubscribe[i].body); + eventsReceived[i].properties!.stamp.should.equal( + eventsSentAfterSubscribe[i].properties!.stamp + ); } }); - it("should support being cancelled from an already aborted AbortSignal", async function(): Promise< + it("'after a particular offset' with isInclusive should receive messages correctly", async function(): Promise< void > { - const partitionId = partitionIds[0]; - const time = Date.now(); - - // send a message that can be received - await producerClient.sendBatch([{ body: "receive cancellation - immediate" }], { partitionId }); + const partitionInfo = await consumerClient.getPartitionProperties(partitionId); + let subscription: Subscription | undefined; + let processEventsCalled = false; + const eventsReceived: ReceivedEventData[] = []; - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: time - }); + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + if (!processEventsCalled) { + processEventsCalled = true; + should.equal(data.length, 1, "Expected 1 event sent right before subscribe call."); + should.equal( + data[0].body, + eventSentBeforeSubscribe.body, + "Should have received only the 1 event sent right before subscribe call." + ); + + await producerClient.sendBatch(eventsSentAfterSubscribe, { + partitionId + }); + return; + } - try { - await new Promise((resolve, reject) => { - let shouldStop = false; - const events: ReceivedEventData[] = []; - - // create an AbortSignal that's in the aborted state - const abortController = new AbortController(); - abortController.abort(); - - const handler = receiver!.receive( - (event) => { - if (!shouldStop) { - events.push(event); - shouldStop = true; - handler - .stop() - .then(() => resolve(events)) - .catch(reject); + eventsReceived.push(...data); + if (eventsReceived.length === eventsSentAfterSubscribe.length) { + resolve(); } }, - reject, - abortController.signal - ); - }); - throw new Error(`Test failure`); - } catch (err) { - err.name.should.equal("AbortError"); - err.message.should.equal("The receive operation has been cancelled by the user."); + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { + offset: partitionInfo.lastEnqueuedOffset, + isInclusive: true + }, + maxWaitTimeInSeconds: 30 + } + ); + }); + await subscription!.close(); + + should.equal( + eventsReceived.length, + eventsSentAfterSubscribe.length, + "Not received the same number of events that were sent." + ); + + for (let i = 0; i < eventsSentAfterSubscribe.length; i++) { + eventsReceived[i].body.should.equal(eventsSentAfterSubscribe[i].body); + eventsReceived[i].properties!.stamp.should.equal( + eventsSentAfterSubscribe[i].properties!.stamp + ); } }); - it("should not support creating a new handler after cancellation", async function(): Promise< + it("'after a particular enqueued time' should receive messages correctly", async function(): Promise< void > { - const partitionId = partitionIds[0]; - const time = Date.now(); - - // send a message that can be received - await producerClient.sendBatch([{ body: "receive cancellation - immediate" }], { partitionId }); - - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: time - }); + const partitionInfo = await consumerClient.getPartitionProperties(partitionId); + let subscription: Subscription | undefined; + let processEventsCalled = false; + const eventsReceived: ReceivedEventData[] = []; - try { - await new Promise((resolve, reject) => { - let shouldStop = false; - const events: ReceivedEventData[] = []; - - // create an AbortSignal that's in the aborted state - const abortController = new AbortController(); - abortController.abort(); - - const handler = receiver!.receive( - (event) => { - if (!shouldStop) { - events.push(event); - shouldStop = true; - handler - .stop() - .then(() => resolve(events)) - .catch(reject); + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + if (!processEventsCalled) { + processEventsCalled = true; + should.equal(data.length, 0, "Received events when none were sent yet."); + await producerClient.sendBatch(eventsSentAfterSubscribe, { + partitionId + }); + return; } - }, - reject, - abortController.signal - ); - }); - throw new Error(`Test failure`); - } catch (err) { - err.name.should.equal("AbortError"); - err.message.should.equal("The receive operation has been cancelled by the user."); - const events: ReceivedEventData[] = []; - try { - await new Promise((resolve, reject) => { - let shouldStop = false; - - const handler = receiver!.receive((event) => { - if (!shouldStop) { - events.push(event); - shouldStop = true; - handler - .stop() - .then(() => resolve(events)) - .catch(reject); + eventsReceived.push(...data); + if (eventsReceived.length === eventsSentAfterSubscribe.length) { + resolve(); } - }, reject); - }); - } catch (err) { - err.message.should.match( - /The EventHubConsumer for ".+" has been closed and can no longer be used.*/gi - ); - } + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { enqueuedOn: partitionInfo.lastEnqueuedOnUtc }, + maxWaitTimeInSeconds: 30 + } + ); + }); + await subscription!.close(); - events.length.should.equal(0); + if (eventsReceived.find((event) => event.body === eventSentBeforeSubscribe.body)) { + should.fail("Received event sent before subscribe call with last offset."); } - }); - }); - describe("in batch mode", function(): void { - it("should receive messages correctly", async function(): Promise { - const partitionId = partitionIds[0]; - receiver = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - earliestEventPosition - ); - const data = await receiver.receiveBatch(5, 10); - debug("received messages: ", data); - data.length.should.equal(5, "Failed to receive five expected messages"); - }); - - it("should receive messages correctly with maxRetries 0", async function(): Promise { - const partitionId = partitionIds[0]; - receiver = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - earliestEventPosition, - { retryOptions: { maxRetries: 0 } } + should.equal( + eventsReceived.length, + eventsSentAfterSubscribe.length, + "Not received the same number of events that were sent." ); - const data = await receiver.receiveBatch(5, 10); - debug("received messages: ", data); - data.length.should.equal(5, "Failed to receive five expected messages"); + for (let i = 0; i < eventsSentAfterSubscribe.length; i++) { + eventsReceived[i].body.should.equal(eventsSentAfterSubscribe[i].body); + eventsReceived[i].properties!.stamp.should.equal( + eventsSentAfterSubscribe[i].properties!.stamp + ); + } }); + }); + describe("EventHubConsumer receiveBatch", function(): void { it("should support being cancelled", async function(): Promise { const partitionId = partitionIds[0]; const time = Date.now(); // send a message that can be received - await producerClient.sendBatch([{ body: "batchReceiver cancellation - timeout 0" }], { partitionId }); - - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: time + await producerClient.sendBatch([{ body: "batchReceiver cancellation - timeout 0" }], { + partitionId }); + const receiver = new EventHubConsumer( + consumerClient["_context"], + EventHubConsumerClient.defaultConsumerGroupName, + partitionId, + { + enqueuedOn: time + } + ); + try { // abortSignal event listeners will be triggered after synchronous paths are executed const abortSignal = AbortController.timeout(0); @@ -517,6 +468,8 @@ describe("EventHub Receiver", function(): void { err.name.should.equal("AbortError"); err.message.should.equal("The receive operation has been cancelled by the user."); } + + await receiver.close(); }); it("should support being cancelled from an already aborted AbortSignal", async function(): Promise< @@ -526,12 +479,19 @@ describe("EventHub Receiver", function(): void { const time = Date.now(); // send a message that can be received - await producerClient.sendBatch([{ body: "batchReceiver cancellation - immediate" }], { partitionId }); - - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: time + await producerClient.sendBatch([{ body: "batchReceiver cancellation - immediate" }], { + partitionId }); + const receiver = new EventHubConsumer( + consumerClient["_context"], + EventHubConsumerClient.defaultConsumerGroupName, + partitionId, + { + enqueuedOn: time + } + ); + try { // abortSignal event listeners will be triggered after synchronous paths are executed const abortController = new AbortController(); @@ -542,6 +502,8 @@ describe("EventHub Receiver", function(): void { err.name.should.equal("AbortError"); err.message.should.equal("The receive operation has been cancelled by the user."); } + + await receiver.close(); }); it("should support cancellation when a connection already exists", async function(): Promise< @@ -551,12 +513,19 @@ describe("EventHub Receiver", function(): void { const time = Date.now(); // send a message that can be received - await producerClient.sendBatch([{ body: "batchReceiver cancellation - timeout 0" }], { partitionId }); - - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: time + await producerClient.sendBatch([{ body: "batchReceiver cancellation - timeout 0" }], { + partitionId }); + const receiver = new EventHubConsumer( + consumerClient["_context"], + EventHubConsumerClient.defaultConsumerGroupName, + partitionId, + { + enqueuedOn: time + } + ); + try { // call receiveBatch once to establish a connection await receiver.receiveBatch(1, 60); @@ -568,6 +537,8 @@ describe("EventHub Receiver", function(): void { err.name.should.equal("AbortError"); err.message.should.equal("The receive operation has been cancelled by the user."); } + + await receiver.close(); }); it("should not support calling receiveBatch after a cancellation", async function(): Promise< @@ -577,12 +548,19 @@ describe("EventHub Receiver", function(): void { const time = Date.now(); // send a message that can be received - await producerClient.sendBatch([{ body: "batchReceiver post-cancellation - timeout 0" }], { partitionId }); - - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - enqueuedOn: time + await producerClient.sendBatch([{ body: "batchReceiver post-cancellation - timeout 0" }], { + partitionId }); + const receiver = new EventHubConsumer( + consumerClient["_context"], + EventHubConsumerClient.defaultConsumerGroupName, + partitionId, + { + enqueuedOn: time + } + ); + try { // abortSignal event listeners will be triggered after synchronous paths are executed const abortSignal = AbortController.timeout(0); @@ -600,275 +578,55 @@ describe("EventHub Receiver", function(): void { ); } } + + await receiver.close(); }); }); - describe("with trackLastEnqueuedEventProperties", function(): void { + describe("subscribe() with trackLastEnqueuedEventProperties", function(): void { it("should have lastEnqueuedEventProperties populated", async function(): Promise { const partitionId = partitionIds[0]; - for (let i = 0; i < 10; i++) { - const ed: EventData = { - body: "Hello awesome world " + i - }; - await producerClient.sendBatch([ed], { partitionId }); - debug("sent message - " + i); - } - debug("Getting the partition information"); - const pInfo = await client.getPartitionProperties(partitionId); - debug("partition info: ", pInfo); - debug("Creating new receiver with offset EndOfStream"); - receiver = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - earliestEventPosition, - { - trackLastEnqueuedEventProperties: true - } - ); - const data = await receiver.receiveBatch(1, 10); - debug("receiver.runtimeInfo ", receiver.lastEnqueuedEventProperties); - data.length.should.equal(1); - should.exist(receiver.lastEnqueuedEventProperties); - receiver.lastEnqueuedEventProperties!.offset!.should.equal(pInfo.lastEnqueuedOffset); - receiver.lastEnqueuedEventProperties!.sequenceNumber!.should.equal( - pInfo.lastEnqueuedSequenceNumber - ); - receiver - .lastEnqueuedEventProperties!.enqueuedOn!.getTime() - .should.equal(pInfo.lastEnqueuedOnUtc.getTime()); - receiver - .lastEnqueuedEventProperties!.retrievedOn!.getTime() - .should.be.greaterThan(Date.now() - 60000); - }); - }); + const eventData = { body: "Hello awesome world " + Math.random() }; + await producerClient.sendBatch([eventData], { partitionId }); + debug("sent: ", eventData); - describe("with ownerLevel", function(): void { - it("should behave correctly when a receiver with lower ownerLevel value is connected after a receiver with higher ownerLevel value to a partition in a consumer group", function(done: Mocha.Done): void { - const partitionId = partitionIds[0]; - let ownerLevelRcvr1: ReceiveHandler; - let ownerLevelRcvr2: ReceiveHandler; - const onError = (error: MessagingError | Error) => { - debug(">>>> ownerLevel Receiver 1", error); - throw new Error( - "An Error should not have happened for ownerLevel receiver with ownerLevel value 2." - ); - }; - const onMsg = (data: ReceivedEventData) => { - debug(">>>> ownerLevel Receiver 1", data); - }; - const receiver1 = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - latestEventPosition, - { - ownerLevel: 2 - } - ); - ownerLevelRcvr1 = receiver1.receive(onMsg, onError); - debug("Created ownerLevel receiver 1 %s", ownerLevelRcvr1); - setTimeout(() => { - const onError2 = (error: MessagingError | Error) => { - debug(">>>> ownerLevel Receiver 2", error); - should.exist(error); - should.equal((error as any).code, "ReceiverDisconnectedError"); - ownerLevelRcvr2 - .stop() - .then(() => receiver2.close()) - .then(() => ownerLevelRcvr1.stop()) - .then(() => receiver1.close()) - .then(() => { - debug("Successfully closed the ownerLevel receivers 1 and 2."); - done(); - }) - .catch((err) => { - debug("error occurred while closing the receivers... ", err); - done(); - }); - }; - const onMsg2 = (data: ReceivedEventData) => { - debug(">>>> ownerLevel Receiver 2", data); - }; - const receiver2 = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - latestEventPosition, - { - ownerLevel: 1 - } - ); - ownerLevelRcvr2 = receiver2.receive(onMsg2, onError2); - debug("Created ownerLevel receiver 2 %s", ownerLevelRcvr2); - }, 3000); - }); + const pInfo = await consumerClient.getPartitionProperties(partitionId); + debug("partition info: ", pInfo); - it("should behave correctly when a receiver with higher ownerLevel value is connected after a receiver with lower ownerLevel value to a partition in a consumer group", function(done: Mocha.Done): void { - const partitionId = partitionIds[0]; - let ownerLevelRcvr1: ReceiveHandler; - let ownerLevelRcvr2: ReceiveHandler; - let receiver2: EventHubConsumer; - const onError = (error: MessagingError | Error) => { - debug(">>>> ownerLevel Receiver 1", error); - should.exist(error); - should.equal((error as any).code, "ReceiverDisconnectedError"); - ownerLevelRcvr1 - .stop() - .then(() => receiver1.close()) - .then(() => ownerLevelRcvr2.stop()) - .then(() => receiver2.close()) - .then(() => { - debug("Successfully closed the ownerLevel receivers 1 and 2."); - done(); - }) - .catch((err) => { - debug("error occurred while closing the receivers... ", err); - done(); - }); - }; - const onMsg = (data: ReceivedEventData) => { - debug(">>>> ownerLevel Receiver 1", data); - }; - const receiver1 = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - latestEventPosition, - { - ownerLevel: 1 - } - ); - ownerLevelRcvr1 = receiver1.receive(onMsg, onError); - debug("Created ownerLevel receiver 1 %s", ownerLevelRcvr1); - setTimeout(() => { - const onError2 = (error: MessagingError | Error) => { - debug(">>>> ownerLevel Receiver 2", error); - throw new Error( - "An Error should not have happened for ownerLevel receiver with ownerLevel value 2." - ); - }; - const onMsg2 = (data: ReceivedEventData) => { - debug(">>>> ownerLevel Receiver 2", data); - }; - receiver2 = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, + let subscription: Subscription | undefined; + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( partitionId, - latestEventPosition, { - ownerLevel: 2 - } - ); - ownerLevelRcvr2 = receiver2.receive(onMsg2, onError2); - debug("Created ownerLevel receiver 2 %s", ownerLevelRcvr2); - }, 3000); - }); - - it("should behave correctly when a non ownerLevel receiver is created after an ownerLevel receiver", function(done: Mocha.Done): void { - const partitionId = partitionIds[0]; - let ownerLevelRcvr: ReceiveHandler; - let nonownerLevelRcvr: ReceiveHandler; - const onerr1 = (error: MessagingError | Error) => { - debug(">>>> ownerLevel Receiver ", error); - throw new Error( - "An Error should not have happened for ownerLevel receiver with ownerLevel value 1." - ); - }; - const onmsg1 = (data: ReceivedEventData) => { - debug(">>>> ownerLevel Receiver ", data); - }; - const receiver1 = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - latestEventPosition, - { - ownerLevel: 1 - } - ); - ownerLevelRcvr = receiver1.receive(onmsg1, onerr1); - debug("Created ownerLevel receiver %s", ownerLevelRcvr); - const onerr2 = (error: MessagingError | Error) => { - debug(">>>> non ownerLevel Receiver", error); - should.exist(error); - should.equal((error as any).code, "ReceiverDisconnectedError"); - nonownerLevelRcvr - .stop() - .then(() => receiver2.close()) - .then(() => ownerLevelRcvr.stop()) - .then(() => receiver1.close()) - .then(() => { - debug("Successfully closed the nonownerLevel and ownerLevel receivers"); - done(); - }) - .catch((err) => { - debug("error occurred while closing the receivers... ", err); - done(); - }); - }; - const onmsg2 = (data: ReceivedEventData) => { - debug(">>>> non ownerLevel Receiver", data); - }; - const receiver2 = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - latestEventPosition - ); - nonownerLevelRcvr = receiver2.receive(onmsg2, onerr2); - debug("Created non ownerLevel receiver %s", nonownerLevelRcvr); - }); + processEvents: async (data, context) => { + data.length.should.equal(1); + should.exist(context.lastEnqueuedEventProperties); + context.lastEnqueuedEventProperties!.offset!.should.equal(pInfo.lastEnqueuedOffset); + context.lastEnqueuedEventProperties!.sequenceNumber!.should.equal( + pInfo.lastEnqueuedSequenceNumber + ); + context + .lastEnqueuedEventProperties!.enqueuedOn!.getTime() + .should.equal(pInfo.lastEnqueuedOnUtc.getTime()); + context + .lastEnqueuedEventProperties!.retrievedOn!.getTime() + .should.be.greaterThan(Date.now() - 60000); - it("should behave correctly when an ownerLevel receiver is created after a non ownerLevel receiver", function(done: Mocha.Done): void { - const partitionId = partitionIds[0]; - let ownerLevelRcvr: ReceiveHandler; - let nonownerLevelRcvr: ReceiveHandler; - let receiver1: EventHubConsumer; - let receiver2: EventHubConsumer; - const onerr3 = (error: MessagingError | Error) => { - debug(">>>> non ownerLevel Receiver", error); - should.exist(error); - should.equal((error as any).code, "ReceiverDisconnectedError"); - nonownerLevelRcvr - .stop() - .then(() => receiver1.close()) - .then(() => ownerLevelRcvr.stop()) - .then(() => receiver2.close()) - .then(() => { - debug("Successfully closed the nonownerLevel and ownerLevel receivers"); - done(); - }) - .catch((err) => { - debug("error occurred while closing the receivers... ", err); - done(); - }); - }; - const onmsg3 = (data: ReceivedEventData) => { - debug(">>>> non ownerLevel Receiver", data); - }; - receiver1 = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - latestEventPosition - ); - nonownerLevelRcvr = receiver1.receive(onmsg3, onerr3); - debug("Created non ownerLevel receiver %s", nonownerLevelRcvr); - setTimeout(() => { - const onerr4 = (error: MessagingError | Error) => { - debug(">>>> ownerLevel Receiver ", error); - throw new Error( - "OnErr4 >> An Error should not have happened for ownerLevel receiver with ownerLevel value 1." - ); - }; - const onmsg4 = (data: ReceivedEventData) => { - debug(">>>> ownerLevel Receiver ", data); - }; - receiver2 = client.createConsumer( - EventHubConsumerClient.defaultConsumerGroupName, - partitionId, - latestEventPosition, + resolve(); + }, + processError: async (err) => { + reject(err); + } + }, { - ownerLevel: 1 + startPosition: earliestEventPosition, + maxBatchSize: 1, + trackLastEnqueuedEventProperties: true } ); - ownerLevelRcvr = receiver2.receive(onmsg4, onerr4); - debug("Created ownerLevel receiver %s", ownerLevelRcvr); - }, 3000); + }); + await subscription!.close(); }); });