diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 8d8e89254886..c8cbcc4828de 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -67,6 +67,30 @@ packages: dev: false resolution: integrity: sha512-RVG1Ad3Afv9gwFFmpeCXQAm+Sa0L8KEZRJJAAZEGoYDb6EoO1iQDVmoBz720h8mdrGpi0D60xNU/KhriIwuZfQ== + /@azure/core-amqp/1.1.4: + dependencies: + '@azure/abort-controller': 1.0.1 + '@azure/core-auth': 1.1.3 + '@azure/logger': 1.0.0 + '@types/async-lock': 1.1.2 + '@types/is-buffer': 2.0.0 + async-lock: 1.2.4 + buffer: 5.6.0 + events: 3.1.0 + is-buffer: 2.0.4 + jssha: 3.1.0 + process: 0.11.10 + rhea: 1.0.23 + rhea-promise: 1.0.0 + stream-browserify: 3.0.0 + tslib: 2.0.0 + url: 0.11.0 + util: 0.12.3 + dev: false + engines: + node: '>=8.0.0' + resolution: + integrity: sha512-1kPDQMOYcmVRMoe9wAx4tqcM5MlkgCWeIq5gfu8u1dK9UWbVy3mDP9OQJOTZJxccOF1AKaJ7yGQhM+uNrSmwog== /@azure/core-asynciterator-polyfill/1.0.0: dev: false resolution: @@ -155,6 +179,22 @@ packages: dev: false resolution: integrity: sha512-CxaMaEjwtsmIhWtjHyGimKO7RmES0YxPqGQ9+jKqGygNlhG5NYHktDaiQu6w7k3g+I51VaLXtVSt+BVFd6VWfQ== + /@azure/event-hubs/5.2.2: + dependencies: + '@azure/abort-controller': 1.0.1 + '@azure/core-amqp': 1.1.4 + '@azure/core-asynciterator-polyfill': 1.0.0 + '@azure/core-tracing': 1.0.0-preview.8 + '@azure/logger': 1.0.0 + '@opentelemetry/api': 0.6.1 + buffer: 5.6.0 + process: 0.11.10 + rhea-promise: 1.0.0 + tslib: 2.0.0 + uuid: 8.2.0 + dev: false + resolution: + integrity: sha512-F/1jaTC9NxgNjMkO7SAs9Q9BndJ16AtRwQu0l21FNyRCN8kWl4Noiblsbsjtv+BPYa+ARrocR5POMlJ5eveR9w== /@azure/logger-js/1.3.2: dependencies: tslib: 1.13.0 @@ -8284,6 +8324,7 @@ packages: version: 0.0.0 'file:projects/eventhubs-checkpointstore-blob.tgz': dependencies: + '@azure/event-hubs': 5.2.2 '@azure/storage-blob': 12.1.2 '@microsoft/api-extractor': 7.7.11 '@rollup/plugin-commonjs': 11.0.2_rollup@1.32.1 @@ -8344,7 +8385,7 @@ packages: dev: false name: '@rush-temp/eventhubs-checkpointstore-blob' resolution: - integrity: sha512-a2VkvB0VXuppKkDffruxfSgqSFzDSxaY0eIrKr6mLVXE2dhmttnQUqt/v8Rhfam0e9YQZ78STghOi9eyO3/oKg== + integrity: sha512-yAtH+PkS66Vya1u6rFOuKprUZX+c8tVGk/jC6HFvK38X8ZmR+v1W6v9oKD0atccu4QVym87lhdEqr+u3D0Ogag== tarball: 'file:projects/eventhubs-checkpointstore-blob.tgz' version: 0.0.0 'file:projects/identity.tgz': diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index 5a031908605e..1b5f34f84d0c 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +## 5.3.0-preview.1 (Unreleased) + +- Adds `loadBalancingOptions` to the `EventHubConsumerClient` to add control around + how aggressively the client claims partitions while load balancing. + ([PR 9706](https://github.com/Azure/azure-sdk-for-js/pull/9706)) + ## 5.2.2 (2020-06-30) - Fixes issue [#9289](https://github.com/Azure/azure-sdk-for-js/issues/9289) diff --git a/sdk/eventhub/event-hubs/package.json b/sdk/eventhub/event-hubs/package.json index f4f06268f4df..4c5c5f894286 100644 --- a/sdk/eventhub/event-hubs/package.json +++ b/sdk/eventhub/event-hubs/package.json @@ -1,7 +1,7 @@ { "name": "@azure/event-hubs", "sdk-type": "client", - "version": "5.2.2", + "version": "5.3.0-preview.1", "description": "Azure Event Hubs SDK for JS.", "author": "Microsoft Corporation", "license": "MIT", diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 0eae6f1289f4..ed3f94ed4d47 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -81,12 +81,12 @@ export interface EventHubClientOptions { // @public export class EventHubConsumerClient { - constructor(consumerGroup: string, connectionString: string, options?: EventHubClientOptions); - constructor(consumerGroup: string, connectionString: string, checkpointStore: CheckpointStore, options?: EventHubClientOptions); - constructor(consumerGroup: string, connectionString: string, eventHubName: string, options?: EventHubClientOptions); - constructor(consumerGroup: string, connectionString: string, eventHubName: string, checkpointStore: CheckpointStore, options?: EventHubClientOptions); - constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, options?: EventHubClientOptions); - constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, checkpointStore: CheckpointStore, options?: EventHubClientOptions); + constructor(consumerGroup: string, connectionString: string, options?: EventHubConsumerClientOptions); + constructor(consumerGroup: string, connectionString: string, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions); + constructor(consumerGroup: string, connectionString: string, eventHubName: string, options?: EventHubConsumerClientOptions); + constructor(consumerGroup: string, connectionString: string, eventHubName: string, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions); + constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, options?: EventHubConsumerClientOptions); + constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions); close(): Promise; static defaultConsumerGroupName: string; get eventHubName(): string; @@ -98,6 +98,11 @@ export class EventHubConsumerClient { subscribe(partitionId: string, handlers: SubscriptionEventHandlers, options?: SubscribeOptions): Subscription; } +// @public +export interface EventHubConsumerClientOptions extends EventHubClientOptions { + loadBalancingOptions?: LoadBalancingOptions; +} + // @public export class EventHubProducerClient { constructor(connectionString: string, options?: EventHubClientOptions); @@ -152,6 +157,13 @@ export interface LastEnqueuedEventProperties { // @public export const latestEventPosition: EventPosition; +// @public +export interface LoadBalancingOptions { + partitionOwnershipExpirationIntervalInMs?: number; + strategy?: "balanced" | "greedy"; + updateIntervalInMs?: number; +} + // @public export const logger: import("@azure/logger").AzureLogger; diff --git a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts index 4189cc64675b..8820fdf94fc4 100644 --- a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts @@ -3,14 +3,14 @@ import { ConnectionContext, createConnectionContext } from "./connectionContext"; import { - EventHubClientOptions, + EventHubConsumerClientOptions, GetEventHubPropertiesOptions, GetPartitionIdsOptions, - GetPartitionPropertiesOptions + GetPartitionPropertiesOptions, + LoadBalancingOptions } from "./models/public"; import { InMemoryCheckpointStore } from "./inMemoryCheckpointStore"; import { CheckpointStore, EventProcessor, FullEventProcessorOptions } from "./eventProcessor"; -import { GreedyPartitionLoadBalancer } from "./partitionLoadBalancer"; import { Constants, TokenCredential } from "@azure/core-amqp"; import { logger } from "./log"; @@ -24,6 +24,10 @@ import { EventHubProperties, PartitionProperties } from "./managementClient"; import { PartitionGate } from "./impl/partitionGate"; import { v4 as uuid } from "uuid"; import { validateEventPositions } from "./eventPosition"; +import { LoadBalancingStrategy } from "./loadBalancerStrategies/loadBalancingStrategy"; +import { UnbalancedLoadBalancingStrategy } from "./loadBalancerStrategies/unbalancedStrategy"; +import { GreedyLoadBalancingStrategy } from "./loadBalancerStrategies/greedyStrategy"; +import { BalancedLoadBalancingStrategy } from "./loadBalancerStrategies/balancedStrategy"; const defaultConsumerClientOptions: Required; + /** * @property * @readonly @@ -111,7 +120,11 @@ export class EventHubConsumerClient { * - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets. * - `userAgent` : A string to append to the built in user agent string that is passed to the service. */ - constructor(consumerGroup: string, connectionString: string, options?: EventHubClientOptions); // #1 + constructor( + consumerGroup: string, + connectionString: string, + options?: EventHubConsumerClientOptions + ); // #1 /** * @constructor * The `EventHubConsumerClient` class is used to consume events from an Event Hub. @@ -133,7 +146,7 @@ export class EventHubConsumerClient { consumerGroup: string, connectionString: string, checkpointStore: CheckpointStore, - options?: EventHubClientOptions + options?: EventHubConsumerClientOptions ); // #1.1 /** * @constructor @@ -154,7 +167,7 @@ export class EventHubConsumerClient { consumerGroup: string, connectionString: string, eventHubName: string, - options?: EventHubClientOptions + options?: EventHubConsumerClientOptions ); // #2 /** * @constructor @@ -179,7 +192,7 @@ export class EventHubConsumerClient { connectionString: string, eventHubName: string, checkpointStore: CheckpointStore, - options?: EventHubClientOptions + options?: EventHubConsumerClientOptions ); // #2.1 /** * @constructor @@ -202,7 +215,7 @@ export class EventHubConsumerClient { fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, - options?: EventHubClientOptions + options?: EventHubConsumerClientOptions ); // #3 /** * @constructor @@ -229,18 +242,21 @@ export class EventHubConsumerClient { eventHubName: string, credential: TokenCredential, checkpointStore: CheckpointStore, - options?: EventHubClientOptions + options?: EventHubConsumerClientOptions ); // #3.1 constructor( private _consumerGroup: string, connectionStringOrFullyQualifiedNamespace2: string, - checkpointStoreOrEventHubNameOrOptions3?: CheckpointStore | EventHubClientOptions | string, + checkpointStoreOrEventHubNameOrOptions3?: + | CheckpointStore + | EventHubConsumerClientOptions + | string, checkpointStoreOrCredentialOrOptions4?: | CheckpointStore - | EventHubClientOptions + | EventHubConsumerClientOptions | TokenCredential, - checkpointStoreOrOptions5?: CheckpointStore | EventHubClientOptions, - options6?: EventHubClientOptions + checkpointStoreOrOptions5?: CheckpointStore | EventHubConsumerClientOptions, + options6?: EventHubConsumerClientOptions ) { if (isTokenCredential(checkpointStoreOrCredentialOrOptions4)) { // #3 or 3.1 @@ -271,7 +287,7 @@ export class EventHubConsumerClient { // 2.1 this._checkpointStore = checkpointStoreOrCredentialOrOptions4; this._userChoseCheckpointStore = true; - this._clientOptions = (checkpointStoreOrOptions5 as EventHubClientOptions) || {}; + this._clientOptions = (checkpointStoreOrOptions5 as EventHubConsumerClientOptions) || {}; } else { // 2 this._checkpointStore = new InMemoryCheckpointStore(); @@ -293,13 +309,13 @@ export class EventHubConsumerClient { this._checkpointStore = checkpointStoreOrEventHubNameOrOptions3; this._userChoseCheckpointStore = true; this._clientOptions = - (checkpointStoreOrCredentialOrOptions4 as EventHubClientOptions) || {}; + (checkpointStoreOrCredentialOrOptions4 as EventHubConsumerClientOptions) || {}; } else { // 1 this._checkpointStore = new InMemoryCheckpointStore(); this._userChoseCheckpointStore = false; this._clientOptions = - (checkpointStoreOrEventHubNameOrOptions3 as EventHubClientOptions) || {}; + (checkpointStoreOrEventHubNameOrOptions3 as EventHubConsumerClientOptions) || {}; } this._context = createConnectionContext( @@ -307,6 +323,14 @@ export class EventHubConsumerClient { this._clientOptions ); } + this._loadBalancingOptions = { + // default options + strategy: "balanced", + updateIntervalInMs: 10000, + partitionOwnershipExpirationIntervalInMs: 60000, + // options supplied by user + ...this._clientOptions?.loadBalancingOptions + }; } /** @@ -466,6 +490,27 @@ export class EventHubConsumerClient { return subscription; } + /** + * Gets the LoadBalancing strategy that should be used based on what the user provided. + */ + private _getLoadBalancingStrategy(): LoadBalancingStrategy { + if (!this._userChoseCheckpointStore) { + // The default behavior when a checkpointstore isn't provided + // is to always grab all the partitions. + return new UnbalancedLoadBalancingStrategy(); + } + + const partitionOwnershipExpirationIntervalInMs = this._loadBalancingOptions + .partitionOwnershipExpirationIntervalInMs; + if (this._loadBalancingOptions?.strategy === "greedy") { + return new GreedyLoadBalancingStrategy(partitionOwnershipExpirationIntervalInMs); + } + + // The default behavior when a checkpointstore is provided is + // to grab one partition at a time. + return new BalancedLoadBalancingStrategy(partitionOwnershipExpirationIntervalInMs); + } + private createEventProcessorForAllPartitions( subscriptionEventHandlers: SubscriptionEventHandlers, options?: SubscribeOptions @@ -480,6 +525,7 @@ export class EventHubConsumerClient { logger.verbose("EventHubConsumerClient subscribing to all partitions, no checkpoint store."); } + const loadBalancingStrategy = this._getLoadBalancingStrategy(); const eventProcessor = this._createEventProcessor( this._context, subscriptionEventHandlers, @@ -488,13 +534,12 @@ export class EventHubConsumerClient { ...defaultConsumerClientOptions, ...(options as SubscribeOptions), ownerLevel: getOwnerLevel(options, this._userChoseCheckpointStore), - processingTarget: this._userChoseCheckpointStore - ? undefined - : new GreedyPartitionLoadBalancer(), // make it so all the event processors process work with the same overarching owner ID // this allows the EventHubConsumer to unify all the work for any processors that it spawns ownerId: this._id, - retryOptions: this._clientOptions.retryOptions + retryOptions: this._clientOptions.retryOptions, + loadBalancingStrategy, + loopIntervalInMs: this._loadBalancingOptions.updateIntervalInMs } ); @@ -529,7 +574,9 @@ export class EventHubConsumerClient { ...options, processingTarget: partitionId, ownerLevel: getOwnerLevel(subscribeOptions, this._userChoseCheckpointStore), - retryOptions: this._clientOptions.retryOptions + retryOptions: this._clientOptions.retryOptions, + loadBalancingStrategy: new UnbalancedLoadBalancingStrategy(), + loopIntervalInMs: this._loadBalancingOptions.updateIntervalInMs ?? 10000 } ); diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 286968dc5d1c..85925d0476ba 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -3,9 +3,8 @@ import { v4 as uuid } from "uuid"; import { PumpManager, PumpManagerImpl } from "./pumpManager"; -import { AbortController, AbortSignalLike } from "@azure/abort-controller"; +import { AbortController, AbortSignalLike, AbortError } from "@azure/abort-controller"; import { logErrorStackTrace, logger } from "./log"; -import { FairPartitionLoadBalancer, PartitionLoadBalancer } from "./partitionLoadBalancer"; import { Checkpoint, PartitionProcessor } from "./partitionProcessor"; import { SubscriptionEventHandlers } from "./eventHubConsumerClientModels"; import { EventPosition, isEventPosition, latestEventPosition } from "./eventPosition"; @@ -13,6 +12,7 @@ import { delayWithoutThrow } from "./util/delayWithoutThrow"; import { CommonEventProcessorOptions } from "./models/private"; import { CloseReason } from "./models/public"; import { ConnectionContext } from "./connectionContext"; +import { LoadBalancingStrategy } from "./loadBalancerStrategies/loadBalancingStrategy"; /** * An interface representing the details on which instance of a `EventProcessor` owns processing @@ -135,6 +135,14 @@ export interface FullEventProcessorOptions extends CommonEventProcessorOptions { * @ignore */ pumpManager?: PumpManager; + /** + * The amount of time between load balancing attempts. + */ + loopIntervalInMs: number; + /** + * A specific partition to target. + */ + processingTarget?: string; } /** @@ -176,9 +184,18 @@ export class EventProcessor { private _isRunning: boolean = false; private _loopTask?: PromiseLike; private _abortController?: AbortController; - private _processingTarget: PartitionLoadBalancer | string; - private _loopIntervalInMs = 10000; - private _inactiveTimeLimitInMs = 60000; + /** + * A specific partition to target. + */ + private _processingTarget?: string; + /** + * Determines which partitions to claim as part of load balancing. + */ + private _loadBalancingStrategy: LoadBalancingStrategy; + /** + * The amount of time between load balancing attempts. + */ + private _loopIntervalInMs: number; private _eventHubName: string; private _fullyQualifiedNamespace: string; @@ -214,12 +231,9 @@ export class EventProcessor { this._processorOptions = options; this._pumpManager = options.pumpManager || new PumpManagerImpl(this._id, this._processorOptions); - const inactiveTimeLimitInMS = options.inactiveTimeLimitInMs || this._inactiveTimeLimitInMs; - this._processingTarget = - options.processingTarget || new FairPartitionLoadBalancer(inactiveTimeLimitInMS); - if (options.loopIntervalInMs) { - this._loopIntervalInMs = options.loopIntervalInMs; - } + this._processingTarget = options.processingTarget; + this._loopIntervalInMs = options.loopIntervalInMs; + this._loadBalancingStrategy = options.loadBalancingStrategy; } /** @@ -385,11 +399,10 @@ export class EventProcessor { * When a new partition is claimed, this method is also responsible for starting a partition pump that creates an * EventHubConsumer for processing events from that partition. */ - private async _runLoopWithLoadBalancing( - loadBalancer: PartitionLoadBalancer, + loadBalancingStrategy: LoadBalancingStrategy, abortSignal: AbortSignalLike - ): Promise { + ) { let cancelLoopResolver; // This provides a mechanism for exiting the loop early // if the subscription has had `close` called. @@ -402,61 +415,14 @@ export class EventProcessor { abortSignal.addEventListener("abort", resolve); }); - // periodically check if there is any partition not being processed and process it + // Periodically check if any partitions need to be claimed and claim them. while (!abortSignal.aborted) { + const iterationStartTimeInMs = Date.now(); try { - const partitionOwnershipMap: Map = new Map(); - // Retrieve current partition ownership details from the datastore. - const partitionOwnership = await this._checkpointStore.listOwnership( - this._fullyQualifiedNamespace, - this._eventHubName, - this._consumerGroup - ); - - const abandonedMap: Map = new Map(); - - for (const ownership of partitionOwnership) { - if (isAbandoned(ownership)) { - abandonedMap.set(ownership.partitionId, ownership); - continue; - } - - partitionOwnershipMap.set(ownership.partitionId, ownership); - } const { partitionIds } = await this._context.managementSession!.getEventHubProperties({ abortSignal: abortSignal }); - - if (abortSignal.aborted) { - return; - } - - if (partitionIds.length > 0) { - const partitionsToClaim = loadBalancer.loadBalance( - this._id, - partitionOwnershipMap, - partitionIds - ); - if (partitionsToClaim) { - for (const partitionToClaim of partitionsToClaim) { - let ownershipRequest: PartitionOwnership; - - if (abandonedMap.has(partitionToClaim)) { - ownershipRequest = this._createPartitionOwnershipRequest( - abandonedMap, - partitionToClaim - ); - } else { - ownershipRequest = this._createPartitionOwnershipRequest( - partitionOwnershipMap, - partitionToClaim - ); - } - - await this._claimOwnership(ownershipRequest, abortSignal); - } - } - } + await this._performLoadBalancing(loadBalancingStrategy, partitionIds, abortSignal); } catch (err) { logger.warning( `[${this._id}] An error occured within the EventProcessor loop: ${err?.name}: ${err?.message}` @@ -465,12 +431,14 @@ export class EventProcessor { // Protect against the scenario where the user awaits on subscription.close() from inside processError. await Promise.race([this._handleSubscriptionError(err), cancelLoopPromise]); } finally { - // sleep for some time, then continue the loop again. + // Sleep for some time, then continue the loop. + const iterationDeltaInMs = Date.now() - iterationStartTimeInMs; + const delayDurationInMs = Math.max(this._loopIntervalInMs - iterationDeltaInMs, 0); logger.verbose( - `[${this._id}] Pausing the EventProcessor loop for ${this._loopIntervalInMs} ms.` + `[${this._id}] Pausing the EventProcessor loop for ${delayDurationInMs} ms.` ); - // swallow the error since it's fine to exit early from delay - await delayWithoutThrow(this._loopIntervalInMs, abortSignal); + // Swallow the error since it's fine to exit early from the delay. + await delayWithoutThrow(delayDurationInMs, abortSignal); } } @@ -480,6 +448,68 @@ export class EventProcessor { this._isRunning = false; } + private async _performLoadBalancing( + loadBalancingStrategy: LoadBalancingStrategy, + partitionIds: string[], + abortSignal: AbortSignalLike + ) { + if (abortSignal.aborted) throw new AbortError("The operation was aborted."); + + // Retrieve current partition ownership details from the datastore. + const partitionOwnership = await this._checkpointStore.listOwnership( + this._fullyQualifiedNamespace, + this._eventHubName, + this._consumerGroup + ); + + if (abortSignal.aborted) throw new AbortError("The operation was aborted."); + + const partitionOwnershipMap = new Map(); + const nonAbandonedPartitionOwnershipMap = new Map(); + const partitionsToRenew: string[] = []; + + // Separate abandoned ownerships from claimed ownerships. + // We only want to pass active partition ownerships to the + // load balancer, but we need to hold onto the abandoned + // partition ownerships because we need the etag to claim them. + for (const ownership of partitionOwnership) { + partitionOwnershipMap.set(ownership.partitionId, ownership); + if (!isAbandoned(ownership)) { + nonAbandonedPartitionOwnershipMap.set(ownership.partitionId, ownership); + } + if ( + ownership.ownerId === this._id && + this._pumpManager.isReceivingFromPartition(ownership.partitionId) + ) { + partitionsToRenew.push(ownership.partitionId); + } + } + + // Pass the list of all the partition ids and the collection of claimed partition ownerships + // to the load balance strategy. + // The load balancing strategy only needs to know the full list of partitions, + // and which of those are currently claimed. + // Since abandoned partitions are no longer claimed, we exclude them. + const partitionsToClaim = loadBalancingStrategy.getPartitionsToCliam( + this._id, + nonAbandonedPartitionOwnershipMap, + partitionIds + ); + partitionsToClaim.push(...partitionsToRenew); + + const uniquePartitionsToClaim = new Set(partitionsToClaim); + for (const partitionToClaim of uniquePartitionsToClaim) { + let partitionOwnershipRequest: PartitionOwnership; + + partitionOwnershipRequest = this._createPartitionOwnershipRequest( + partitionOwnershipMap, + partitionToClaim + ); + + await this._claimOwnership(partitionOwnershipRequest, abortSignal); + } + } + /** * This is called when there are errors that are not specific to a partition (ex: load balancing) */ @@ -527,7 +557,7 @@ export class EventProcessor { this._abortController = new AbortController(); logger.verbose(`[${this._id}] Starting an EventProcessor.`); - if (targetWithoutOwnership(this._processingTarget)) { + if (this._processingTarget) { logger.verbose(`[${this._id}] Single partition target: ${this._processingTarget}`); this._loopTask = this._runLoopForSinglePartition( this._processingTarget, @@ -536,7 +566,7 @@ export class EventProcessor { } else { logger.verbose(`[${this._id}] Multiple partitions, using load balancer`); this._loopTask = this._runLoopWithLoadBalancing( - this._processingTarget, + this._loadBalancingStrategy, this._abortController.signal ); } @@ -575,7 +605,7 @@ export class EventProcessor { logger.verbose(`[${this._id}] EventProcessor stopped.`); } - if (targetWithoutOwnership(this._processingTarget)) { + if (this._processingTarget) { logger.verbose(`[${this._id}] No partitions owned, skipping abandoning.`); } else { await this.abandonPartitionOwnerships(); @@ -624,7 +654,3 @@ function getStartPosition( return startPosition; } - -function targetWithoutOwnership(target: PartitionLoadBalancer | string): target is string { - return typeof target === "string"; -} diff --git a/sdk/eventhub/event-hubs/src/index.ts b/sdk/eventhub/event-hubs/src/index.ts index c5aba75bbfc3..b54f4f6ce75d 100644 --- a/sdk/eventhub/event-hubs/src/index.ts +++ b/sdk/eventhub/event-hubs/src/index.ts @@ -9,11 +9,13 @@ export { LastEnqueuedEventProperties } from "./eventHubReceiver"; export { OperationOptions } from "./util/operationOptions"; export { EventHubClientOptions, + EventHubConsumerClientOptions, + LoadBalancingOptions, SendBatchOptions, CreateBatchOptions, GetPartitionIdsOptions, GetPartitionPropertiesOptions, - GetEventHubPropertiesOptions + GetEventHubPropertiesOptions, } from "./models/public"; export { EventHubConsumerClient } from "./eventHubConsumerClient"; export { EventHubProducerClient } from "./eventHubProducerClient"; @@ -25,7 +27,7 @@ export { ProcessErrorHandler, ProcessInitializeHandler, ProcessCloseHandler, - ProcessEventsHandler + ProcessEventsHandler, } from "./eventHubConsumerClientModels"; export { EventPosition, latestEventPosition, earliestEventPosition } from "./eventPosition"; export { PartitionProperties, EventHubProperties } from "./managementClient"; diff --git a/sdk/eventhub/event-hubs/src/loadBalancerStrategies/balancedStrategy.ts b/sdk/eventhub/event-hubs/src/loadBalancerStrategies/balancedStrategy.ts new file mode 100644 index 000000000000..fe3212a2c07e --- /dev/null +++ b/sdk/eventhub/event-hubs/src/loadBalancerStrategies/balancedStrategy.ts @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { PartitionOwnership } from "../eventProcessor"; +import { LoadBalancingStrategy, listAvailablePartitions } from "./loadBalancingStrategy"; + +/** + * The BalancedLoadBalancerStrategy is meant to be used when the user + * wants to reach a load balanced state with less partition 'thrashing'. + * + * Partition thrashing - where a partition changes owners - is minimized + * by only returning a single partition to claim at a time. + * This minimizes the number of times a partition should need to be stolen. + * @internal + * @ignore + */ +export class BalancedLoadBalancingStrategy implements LoadBalancingStrategy { + /** + * Creates an instance of BalancedLoadBalancingStrategy. + * + * @param _partitionOwnershipExpirationIntervalInMs The length of time a partition claim is valid. + */ + constructor(private readonly _partitionOwnershipExpirationIntervalInMs: number) {} + + /** + * Implements load balancing by taking into account current ownership and + * the full set of partitions in the Event Hub. + * @param ourOwnerId The id we should assume is _our_ id when checking for ownership. + * @param claimedPartitionOwnershipMap The current claimed ownerships for partitions. + * @param partitionIds Partitions to assign owners to. + * @returns Partition ids to claim. + */ + public getPartitionsToCliam( + ourOwnerId: string, + claimedPartitionOwnershipMap: Map, + partitionIds: string[] + ): string[] { + const claimablePartitions = listAvailablePartitions( + ourOwnerId, + claimedPartitionOwnershipMap, + partitionIds, + this._partitionOwnershipExpirationIntervalInMs + ); + + if (!claimablePartitions.length) { + return []; + } + + const randomIndex = Math.floor(Math.random() * claimablePartitions.length); + return [claimablePartitions[randomIndex]]; + } +} diff --git a/sdk/eventhub/event-hubs/src/loadBalancerStrategies/greedyStrategy.ts b/sdk/eventhub/event-hubs/src/loadBalancerStrategies/greedyStrategy.ts new file mode 100644 index 000000000000..77b6604e99a7 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/loadBalancerStrategies/greedyStrategy.ts @@ -0,0 +1,39 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { PartitionOwnership } from "../eventProcessor"; +import { LoadBalancingStrategy, listAvailablePartitions } from "./loadBalancingStrategy"; + +/** + * @internal + * @ignore + */ +export class GreedyLoadBalancingStrategy implements LoadBalancingStrategy { + /** + * Creates an instance of GreedyLoadBalancingStrategy. + * + * @param _partitionOwnershipExpirationIntervalInMs The length of time a partition claim is valid. + */ + constructor(private readonly _partitionOwnershipExpirationIntervalInMs: number) {} + + /** + * Implements load balancing by taking into account current ownership and + * the new set of partitions to add. + * @param ourOwnerId The id we should assume is _our_ id when checking for ownership. + * @param claimedPartitionOwnershipMap The current claimed ownerships for partitions. + * @param partitionIds Partitions to assign owners to. + * @returns Partition ids to claim. + */ + public getPartitionsToCliam( + ourOwnerId: string, + claimedPartitionOwnershipMap: Map, + partitionIds: string[] + ): string[] { + return listAvailablePartitions( + ourOwnerId, + claimedPartitionOwnershipMap, + partitionIds, + this._partitionOwnershipExpirationIntervalInMs + ); + } +} diff --git a/sdk/eventhub/event-hubs/src/loadBalancerStrategies/loadBalancingStrategy.ts b/sdk/eventhub/event-hubs/src/loadBalancerStrategies/loadBalancingStrategy.ts new file mode 100644 index 000000000000..1aa0de11dc81 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/loadBalancerStrategies/loadBalancingStrategy.ts @@ -0,0 +1,397 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { PartitionOwnership } from "../eventProcessor"; +import { logger } from "../log"; + +/** + * Determines which partitions to claim as part of load balancing. + * @internal + * @ignore + */ +export interface LoadBalancingStrategy { + /** + * Implements load balancing by taking into account current ownership and + * the full set of partitions in the Event Hub. + * @param ourOwnerId The id we should assume is _our_ id when checking for ownership. + * @param claimedPartitionOwnershipMap The current claimed ownerships for partitions. + * @param partitionIds Partitions to assign owners to. + * @returns Partition ids to claim. + */ + getPartitionsToCliam( + ownerId: string, + claimedPartitionOwnershipMap: Map, + partitionIds: string[] + ): string[]; +} + +/** + * Counts of the EventProcessors that currently own partitions. + * @internal + * @ignore + */ +interface EventProcessorCounts { + /** + * The # of EventProcessors that only own the required # of + * partitions. + */ + haveRequiredPartitions: number; + /** + * The # of EventProcessors that currently own the required # + * of partitions + 1 additional (ie, handling the case where + * the number of partitions is not evenly divisible by the # of + * EventProcessors). + */ + haveAdditionalPartition: number; + /** + * EventProcessors which have more than the required or even required + 1 + * number of partitions. These will eventually be downsized by other + * EventProcessors as they acquire their required number of partitions. + */ + haveTooManyPartitions: number; +} + +/** + * This method will create a new map of partition id and PartitionOwnership containing only those partitions + * that are actively owned. + * All entries in the original map that haven't been modified for a duration of time greater than the allowed + * inactivity time limit are assumed to be owned by dead event processors. + * These will not be included in the map returned by this method. + * + * @param partitionOwnershipMap The existing PartitionOwnerships mapped by partition. + * @param expirationIntervalInMs The length of time a PartitionOwnership claim is valid. + * @ignore + * @internal + */ +function getActivePartitionOwnerships( + partitionOwnershipMap: Map, + expirationIntervalInMs: number +): Map { + const activePartitionOwnershipMap: Map = new Map(); + partitionOwnershipMap.forEach((partitionOwnership: PartitionOwnership, partitionId: string) => { + // If lastModifiedtimeInMs is missing, assume it is inactive. + if ( + typeof partitionOwnership.lastModifiedTimeInMs === "undefined" || + partitionOwnership.lastModifiedTimeInMs === null + ) { + return; + } + + const timeSincePartitionClaimed = Date.now() - partitionOwnership.lastModifiedTimeInMs; + if (timeSincePartitionClaimed < expirationIntervalInMs && partitionOwnership.ownerId) { + activePartitionOwnershipMap.set(partitionId, partitionOwnership); + } + }); + + return activePartitionOwnershipMap; +} + +/** + * Calculates the minimum number of partitions each EventProcessor should own, + * and the number of EventProcessors that should have an extra partition assigned. + * @param ownerToOwnershipMap The current ownerships for partitions. + * @param partitionIds The full list of the Event Hub's partition ids. + * @ignore + * @internal + */ +function calculateBalancedLoadCounts( + ownerToOwnershipMap: Map, + partitionIds: string[] +): { minPartitionsPerOwner: number; requiredNumberOfOwnersWithExtraPartition: number } { + // Calculate the minimum number of partitions every EventProcessor should own when the load + // is evenly distributed. + const minPartitionsPerOwner = Math.floor(partitionIds.length / ownerToOwnershipMap.size); + + // If the number of partitions in the Event Hub is not evenly divisible by the number of active + // EventProcesrrors, some EventProcessors may own 1 partition in addition to the minimum when the + // load is balanced. + // Calculate the number of EventProcessors that can own an additional partition. + const requiredNumberOfOwnersWithExtraPartition = partitionIds.length % ownerToOwnershipMap.size; + + return { + minPartitionsPerOwner, + requiredNumberOfOwnersWithExtraPartition + }; +} + +/** + * Counts the EventProcessors and tallies them by type. + * + * To be in balance we need to make sure that each EventProcessor is only consuming + * their fair share. + * + * When the partitions are divvied up we will sometimes end up with some EventProcessors + * that will have 1 more partition than others. + * This can happen if the number of partitions is not evenly divisible by the number of EventProcessors. + * + * So this function largely exists to support isLoadBalanced() and + * shouldOwnMorePartitions(), both of which depend on knowing if our current list + * of EventProcessors is actually in the proper state. + * + * @param minPartitionsPerOwner The number of required partitions per EventProcessor. + * @param ownerToOwnershipMap The current ownerships for partitions. + * @internal + * @ignore + */ +function getEventProcessorCounts( + minPartitionsPerOwner: number, + ownerToOwnershipMap: Map +): EventProcessorCounts { + const counts: EventProcessorCounts = { + haveRequiredPartitions: 0, + haveAdditionalPartition: 0, + haveTooManyPartitions: 0 + }; + + for (const ownershipList of ownerToOwnershipMap.values()) { + const numberOfPartitions = ownershipList.length; + + // there are basically three kinds of partition counts + // for a processor: + + if (numberOfPartitions === minPartitionsPerOwner) { + // 1. Has _exactly_ the required number of partitions + counts.haveRequiredPartitions++; + } else if (numberOfPartitions === minPartitionsPerOwner + 1) { + // 2. Has the required number plus one extra (correct in cases) + // where the # of partitions is not evenly divisible by the + // number of processors. + counts.haveAdditionalPartition++; + } else if (numberOfPartitions > minPartitionsPerOwner + 1) { + // 3. has more than the possible # of partitions required + counts.haveTooManyPartitions++; + } + } + + return counts; +} + +/** + * Validates that we are currently in a balanced state - all EventProcessors own the + * minimum required number of partitions (and additional partitions, if the # of partitions + * is not evenly divisible by the # of EventProcessors). + * + * @param requiredNumberOfOwnersWithExtraPartition The # of EventProcessors that process an additional partition, in addition to the required minimum. + * @param totalExpectedProcessors The total # of EventProcessors we expect. + * @param eventProcessorCounts EventProcessor counts, grouped by criteria. + * @ignore + * @internal + */ +function isLoadBalanced( + requiredNumberOfOwnersWithExtraPartition: number, + totalExpectedEventProcessors: number, + { haveAdditionalPartition, haveRequiredPartitions }: EventProcessorCounts +): boolean { + return ( + haveAdditionalPartition === requiredNumberOfOwnersWithExtraPartition && + haveRequiredPartitions + haveAdditionalPartition === totalExpectedEventProcessors + ); +} + +/** + * Determines the number of new partitions to claim for this particular processor. + * + * @param minRequired The minimum required number of partitions. + * @param requiredNumberOfOwnersWithExtraPartition The current number of processors that should have an additional partition. + * @param numPartitionsOwnedByUs The number of partitions we currently own. + * @param eventProcessorCounts Processors, grouped by criteria. + * @ignore + * @internal + */ +function getNumberOfPartitionsToClaim( + minRequiredPartitionCount: number, + requiredNumberOfOwnersWithExtraPartition: number, + numPartitionsOwnedByUs: number, + { haveAdditionalPartition, haveTooManyPartitions }: EventProcessorCounts +): number { + let actualRequiredPartitionCount = minRequiredPartitionCount; + + if ( + requiredNumberOfOwnersWithExtraPartition > 0 && + // Eventually the `haveTooManyPartitions` will decay into `haveAdditionalPartition` + // EventProcessors as partitions are balanced to consumers that aren't at par. + // We can consider them to be `haveAdditionalPartition` EventProcessors for our purposes. + haveAdditionalPartition + haveTooManyPartitions < requiredNumberOfOwnersWithExtraPartition + ) { + // Overall we don't have enough EventProcessors that are taking on an additional partition + // so we should attempt to. + actualRequiredPartitionCount = minRequiredPartitionCount + 1; + } + return actualRequiredPartitionCount - numPartitionsOwnedByUs; +} + +/** + * Determines which partitions can be stolen from other owners while maintaining + * a balanced state. + * @param numberOfPartitionsToClaim The number of partitions the owner needs to claim to reach a balanced state. + * @param minPartitionsPerOwner The minimum number of partitions each owner needs for the partition load to be balanced. + * @param requiredNumberOfOwnersWithExtraPartition The number of owners that should have 1 extra partition. + * @param ourOwnerId The id of _our_ owner. + * @param ownerToOwnershipMap The current ownerships for partitions. + * @internal + * @ignore + */ +function findPartitionsToSteal( + numberOfPartitionsToClaim: number, + minPartitionsPerOwner: number, + requiredNumberOfOwnersWithExtraPartition: number, + ourOwnerId: string, + ownerToOwnershipMap: Map +): string[] { + const partitionsToSteal: string[] = []; + // Create a list of PartitionOwnership lists that we can steal from. + const listOfPartitionOwnerships: Array = []; + ownerToOwnershipMap.forEach((partitionOwnerships, ownerId) => { + if (ownerId === ourOwnerId || partitionOwnerships.length <= minPartitionsPerOwner) return; + listOfPartitionOwnerships.push(partitionOwnerships); + }); + + // Sort the list in descending order based on the length of each element. + listOfPartitionOwnerships.sort((a, b) => { + if (a.length > b.length) return -1; + if (a.length < b.length) return 1; + return 0; + }); + + // Attempt to steal partitions from EventProcessors that have the most partitions 1st, + // then work our way down. + let ownersEncounteredWithExtraPartitions = 0; + let currentPartitionOwnershipList = listOfPartitionOwnerships.shift(); + while (numberOfPartitionsToClaim > 0 && currentPartitionOwnershipList) { + let ownersExpectedPartitionCount = minPartitionsPerOwner; + // Determine if the current owner should be allowed to have an extra partition. + if (ownersEncounteredWithExtraPartitions < requiredNumberOfOwnersWithExtraPartition) { + ownersExpectedPartitionCount++; + } + ownersEncounteredWithExtraPartitions++; + + let numberAvailableToSteal = + currentPartitionOwnershipList.length - ownersExpectedPartitionCount; + // Claim as many random partitions as possible. + while (Math.min(numberOfPartitionsToClaim, numberAvailableToSteal)) { + const indexToClaim = Math.floor(Math.random() * currentPartitionOwnershipList.length); + partitionsToSteal.push(currentPartitionOwnershipList.splice(indexToClaim, 1)[0].partitionId); + numberOfPartitionsToClaim--; + numberAvailableToSteal--; + } + + // Move on to the next list of PartitionOwnership. + currentPartitionOwnershipList = listOfPartitionOwnerships.shift(); + } + + return partitionsToSteal; +} + +/** + * Identifies all of the partitions that can be claimed by the specified owner for + * that owner to reach a balanced state. + * @param OwnerId The id we should assume is _our_ id when checking for ownership. + * @param claimedPartitionOwnershipMap The current claimed ownerships for partitions. + * @param partitionIds Partitions to assign owners to. + * @param expirationIntervalInMs The length of time a partition claim is valid. + * @returns Partition ids that may be claimed. + * @internal + * @ignore + */ +export function listAvailablePartitions( + ownerId: string, + claimedPartitionOwnershipMap: Map, + partitionIds: string[], + expirationIntervalInMs: number +): string[] { + if (!partitionIds.length) { + return []; + } + + // Collect only the PartitionOwnership that have been updated within the expiration interval. + // Any PartitionOwnership that has been updated outside the expiration interval can be claimed. + const activePartitionOwnershipMap = getActivePartitionOwnerships( + claimedPartitionOwnershipMap, + expirationIntervalInMs + ); + logger.verbose( + `[${ownerId}] Number of active ownership records: ${activePartitionOwnershipMap.size}.` + ); + + if (activePartitionOwnershipMap.size === 0) { + // All partitions in this Event Hub are available to claim. + return partitionIds; + } + + // Map ownerIds to the partitions they own so that we can determine how many each owner has. + const ownerToOwnershipMap = new Map(); + for (const activeOwnership of activePartitionOwnershipMap.values()) { + const partitionOwnershipList = ownerToOwnershipMap.get(activeOwnership.ownerId) || []; + + partitionOwnershipList.push(activeOwnership); + ownerToOwnershipMap.set(activeOwnership.ownerId, partitionOwnershipList); + } + + // Add the current EventProcessor to the map of owners to ownerships if it doesn't exist. + if (!ownerToOwnershipMap.has(ownerId)) { + ownerToOwnershipMap.set(ownerId, []); + } + + logger.info(`[${ownerId}] Number of active event processors: ${ownerToOwnershipMap.size}.`); + + const { + minPartitionsPerOwner, + requiredNumberOfOwnersWithExtraPartition + } = calculateBalancedLoadCounts(ownerToOwnershipMap, partitionIds); + + logger.verbose( + `[${ownerId}] Expected minimum number of partitions per event processor: ${minPartitionsPerOwner},` + + `expected number of event processors with additional partition: ${requiredNumberOfOwnersWithExtraPartition}.` + ); + + // Get some stats representing the current state the world with regards to how balanced the + // partitions are across EventProcessors. + const eventProcessorCounts = getEventProcessorCounts(minPartitionsPerOwner, ownerToOwnershipMap); + + if ( + isLoadBalanced( + requiredNumberOfOwnersWithExtraPartition, + ownerToOwnershipMap.size, + eventProcessorCounts + ) + ) { + // When the partitions are evenly distributed, no change required. + return []; + } + + let numberOfPartitionsToClaim = getNumberOfPartitionsToClaim( + minPartitionsPerOwner, + requiredNumberOfOwnersWithExtraPartition, + ownerToOwnershipMap.get(ownerId)!.length, + eventProcessorCounts + ); + + if (numberOfPartitionsToClaim <= 0) { + return []; + } + + const partitionsToClaim: string[] = []; + const unclaimedPartitionIds = partitionIds.filter((id) => !activePartitionOwnershipMap.has(id)); + + // Prioritize getting unclaimed partitions first. + while (Math.min(numberOfPartitionsToClaim, unclaimedPartitionIds.length)) { + const indexToClaim = Math.floor(Math.random() * unclaimedPartitionIds.length); + partitionsToClaim.push(unclaimedPartitionIds.splice(indexToClaim, 1)[0]); + numberOfPartitionsToClaim--; + } + + if (numberOfPartitionsToClaim === 0) { + return partitionsToClaim; + } + + // Find partitions that can be stolen from other EventProcessors. + const partitionsToSteal = findPartitionsToSteal( + numberOfPartitionsToClaim, + minPartitionsPerOwner, + requiredNumberOfOwnersWithExtraPartition, + ownerId, + ownerToOwnershipMap + ); + + return partitionsToClaim.concat(partitionsToSteal); +} diff --git a/sdk/eventhub/event-hubs/src/loadBalancerStrategies/unbalancedStrategy.ts b/sdk/eventhub/event-hubs/src/loadBalancerStrategies/unbalancedStrategy.ts new file mode 100644 index 000000000000..549b7970bb8e --- /dev/null +++ b/sdk/eventhub/event-hubs/src/loadBalancerStrategies/unbalancedStrategy.ts @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { PartitionOwnership } from "../eventProcessor"; +import { LoadBalancingStrategy } from "./loadBalancingStrategy"; + +/** + * The UnbalancedLoadBalancingStrategy does no actual load balancing. + * It is intended to be used when you want to avoid load balancing + * and consume a set of partitions. + * @internal + * @ignore + */ +export class UnbalancedLoadBalancingStrategy implements LoadBalancingStrategy { + /** + * Implements load balancing by taking into account current ownership and + * the full set of partitions in the Event Hub. + * @param _ourOwnerId The id we should assume is _our_ id when checking for ownership. + * @param _claimedPartitionOwnershipMap The current claimed ownerships for partitions. + * @param partitionIds Partitions to assign owners to. + * @returns Partition ids to claim. + */ + public getPartitionsToCliam( + _ourOwnerId: string, + _claimedPartitionOwnershipMap: Map, + partitionIds: string[] + ): string[] { + return partitionIds; + } +} diff --git a/sdk/eventhub/event-hubs/src/models/private.ts b/sdk/eventhub/event-hubs/src/models/private.ts index af05c2e35f2d..a5464e88ddb7 100644 --- a/sdk/eventhub/event-hubs/src/models/private.ts +++ b/sdk/eventhub/event-hubs/src/models/private.ts @@ -1,9 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { PartitionLoadBalancer } from "../partitionLoadBalancer"; import { RetryOptions } from "@azure/core-amqp"; import { SubscribeOptions } from "../eventHubConsumerClientModels"; +import { LoadBalancingStrategy } from "../loadBalancerStrategies/loadBalancingStrategy"; /** * The set of options to configure the behavior of an `EventHubProducer`. @@ -40,7 +40,7 @@ export type OperationNames = "getEventHubProperties" | "getPartitionIds" | "getP * @internal * @ignore */ -export interface CommonEventProcessorOptions // make the 'maxBatchSize', 'maxWaitTimeInSeconds', 'ownerLevel' fields required extends // for our internal classes (these are optional for external users) +export interface CommonEventProcessorOptions extends Required>, Pick< SubscribeOptions, @@ -51,14 +51,9 @@ export interface CommonEventProcessorOptions // make the 'maxBatchSize', 'maxWa > > { /** - * The amount of time to wait between each attempt at claiming partitions. + * A load balancing strategy that determines how to claim partitions. */ - loopIntervalInMs?: number; - - /** - * A load balancer to use to find targets or a specific partition to target. - */ - processingTarget?: PartitionLoadBalancer | string; + loadBalancingStrategy: LoadBalancingStrategy; /** * An optional ownerId to use rather than using an internally generated ID diff --git a/sdk/eventhub/event-hubs/src/models/public.ts b/sdk/eventhub/event-hubs/src/models/public.ts index d761d5e42562..73f8deaddf6c 100644 --- a/sdk/eventhub/event-hubs/src/models/public.ts +++ b/sdk/eventhub/event-hubs/src/models/public.ts @@ -128,6 +128,69 @@ export interface EventHubClientOptions { userAgent?: string; } +/** + * Describes the options that can be provided while creating the EventHubConsumerClient. + * - `loadBalancingOptions`: Options to tune how the EventHubConsumerClient claims partitions. + * - `userAgent` : A string to append to the built in user agent string that is passed as a connection property + * to the service. + * - `webSocketOptions` : Options to configure the channelling of the AMQP connection over Web Sockets. + * - `websocket` : The WebSocket constructor used to create an AMQP connection if you choose to make the connection + * over a WebSocket. + * - `webSocketConstructorOptions` : Options to pass to the Websocket constructor when you choose to make the connection + * over a WebSocket. + * - `retryOptions` : The retry options for all the operations on the EventHubConsumerClient. + * A simple usage can be `{ "maxRetries": 4 }`. + * + * Example usage: + * ```js + * { + * retryOptions: { + * maxRetries: 4 + * } + * } + * ``` + */ +export interface EventHubConsumerClientOptions extends EventHubClientOptions { + /** + * Options to tune how the EventHubConsumerClient claims partitions. + */ + loadBalancingOptions?: LoadBalancingOptions; +} + +/** + * An options bag to configure load balancing settings. + */ +export interface LoadBalancingOptions { + /** + * Whether to apply a greedy or a more balanced approach when + * claiming partitions. + * + * - balanced: The `EventHubConsumerClient` will take a measured approach to + * requesting partition ownership when balancing work with other clients, + * slowly claiming partitions until a stabilized distribution is achieved. + * + * - greedy: The `EventHubConsumerClient` will attempt to claim ownership + * of its fair share of partitions aggressively when balancing work with + * other clients. + * + * This option is ignored when either: + * - `CheckpointStore` is __not__ provided to the `EventHubConsumerClient`. + * - `subscribe()` is called for a single partition. + * Default: balanced + */ + strategy?: "balanced" | "greedy"; + /** + * The length of time between attempts to claim partitions. + * Default: 10000 + */ + updateIntervalInMs?: number; + /** + * The length of time a partition claim is valid. + * Default: 60000 + */ + partitionOwnershipExpirationIntervalInMs?: number; +} + /** * Options to configure the `createBatch` method on the `EventHubProducerClient`. * - `partitionKey` : A value that is hashed to produce a partition assignment. diff --git a/sdk/eventhub/event-hubs/src/partitionLoadBalancer.ts b/sdk/eventhub/event-hubs/src/partitionLoadBalancer.ts deleted file mode 100644 index 65d608dbd410..000000000000 --- a/sdk/eventhub/event-hubs/src/partitionLoadBalancer.ts +++ /dev/null @@ -1,403 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -import { PartitionOwnership } from "./eventProcessor"; -import { logger } from "./log"; - -/** - * Implements a load balancing algorithm for determining which consumers - * own which partitions. - * @ignore - * @internal - */ -export interface PartitionLoadBalancer { - /** - * Implements load balancing by taking into account current ownership and - * the new set of partitions to add. - * @param ownerId The id we should assume is _our_ id when checking for ownership. - * @param partitionOwnershipMap The current ownerships for partitions. - * @param partitionsToAdd New partitions to assign owners to. - * @returns Partition ids to claim. - */ - loadBalance( - ownerId: string, - partitionOwnershipMap: Map, - partitionsToAdd: string[] - ): string[]; -} - -/** - * This class does no load balancing - it's intended to be used when - * you want to avoid load balancing and consume a set of partitions (or all - * available partitions) - * @internal - * @ignore - */ -export class GreedyPartitionLoadBalancer implements PartitionLoadBalancer { - private partitionsToClaim?: Set; - - /** - * @param partitionIds An optional set of partition IDs. undefined means all partitions. - */ - constructor(partitionIds?: string[]) { - logger.verbose( - `GreedyPartitionLoadBalancer created. Watching ${ - partitionIds ? "(" + partitionIds.join(",") + ")" : "all" - }.` - ); - this.partitionsToClaim = partitionIds && new Set(partitionIds); - } - - loadBalance( - _ownerId: string, - _partitionOwnershipMap: Map, - partitionsToAdd: string[] - ): string[] { - let potential: string[] = partitionsToAdd; - - if (this.partitionsToClaim) { - const partitionsToClaim = this.partitionsToClaim; - potential = partitionsToAdd.filter((part) => partitionsToClaim.has(part)); - } - - return potential; - } -} - -/** - * This class is responsible for balancing the load of processing events from all partitions of an Event Hub by - * distributing the number of partitions uniformly among all the active EventProcessors. - * - * This load balancer will retrieve partition ownership details from the CheckpointStore to find the number of - * active EventProcessor. It uses the last modified time to decide if an EventProcessor is active. If a - * partition ownership entry has not be updated for a specified duration of time, the owner of that partition is - * considered inactive and the partition is available for other EventProcessors to own. - * @class PartitionLoadBalancer - * @internal - * @ignore - */ -export class FairPartitionLoadBalancer implements PartitionLoadBalancer { - private _inactiveTimeLimitInMS: number; - - /** - * Creates an instance of PartitionBasedLoadBalancer. - * - * @param ownerId The identifier of the Event Processor that owns this load balancer. - * @param inactiveTimeLimitInMS The time to wait for an update on an ownership record before - * assuming the owner of the partition is inactive. - * */ - constructor(inactiveTimeLimitInMS: number) { - logger.verbose( - `FairPartitionLoadBalancer created inactive time limit: ${inactiveTimeLimitInMS}ms` - ); - this._inactiveTimeLimitInMS = inactiveTimeLimitInMS; - } - - /* - * Find the event processor that owns the maximum number of partitions and steal a random partition - * from it. - */ - private _findPartitionToSteal( - ourOwnerId: string, - ownerPartitionMap: Map - ): string { - let maxList: PartitionOwnership[] = []; - let maxPartitionsOwnedByAnyEventProcessor = Number.MIN_VALUE; - let ownerId; - ownerPartitionMap.forEach((ownershipList: PartitionOwnership[], ownerId: string) => { - if (ownershipList.length > maxPartitionsOwnedByAnyEventProcessor) { - maxPartitionsOwnedByAnyEventProcessor = ownershipList.length; - maxList = ownershipList; - ownerId = ownerId; - } - }); - logger.verbose( - `[${ourOwnerId}] Owner id ${ownerId} owns ${maxList.length} partitions, stealing a partition from it.` - ); - return maxList[Math.floor(Math.random() * maxList.length)].partitionId; - } - - /** - * Whether we should attempt to claim more partitions for this particular processor. - * - * @param minRequired The minimum required number of partitions. - * @param numEventProcessorsWithAdditionalPartition The current number of processors that have an additional partition. - * @param numPartitionsOwnedByUs The number of partitions we currently own. - * @param processorCounts Processors, grouped by criteria. - */ - private _shouldOwnMorePartitions( - minRequired: number, - numEventProcessorsWithAdditionalPartition: number, - numPartitionsOwnedByUs: number, - processorCounts: ProcessorCounts - ): boolean { - let actualRequired = minRequired; - - if ( - numEventProcessorsWithAdditionalPartition > 0 && - // eventually the `haveTooManyPartitions` will get decay into `haveAdditionalPartition` - // processors as partitions are balanced to consumers that aren't at par. We can - // consider them to be `haveAdditionalPartition` processors for our purposes. - processorCounts.haveAdditionalPartition + processorCounts.haveTooManyPartitions < - numEventProcessorsWithAdditionalPartition - ) { - // overall we don't have enough processors that are taking on an additional partition - // so we should attempt to. - actualRequired = minRequired + 1; - } - - return numPartitionsOwnedByUs < actualRequired; - } - - /** - * Validates that we are currently in a balanced state - all processors own the - * minimum required number of partitions (and additional partitions, if the # of partitions - * is not evenly divisible by the # of processors). - * - * @param requiredNumberOfEventProcessorsWithAdditionalPartition The # of processors that process an additional partition, in addition to the required minimum. - * @param totalExpectedProcessors The total # of processors we expect. - * @param processorCounts Processors, grouped by criteria. - */ - private _isLoadBalanced( - requiredNumberOfEventProcessorsWithAdditionalPartition: number, - totalExpectedProcessors: number, - processorCounts: ProcessorCounts - ): boolean { - return ( - processorCounts.haveAdditionalPartition === - requiredNumberOfEventProcessorsWithAdditionalPartition && - processorCounts.haveRequiredPartitions + processorCounts.haveAdditionalPartition === - totalExpectedProcessors - ); - } - - /** - * Counts the processors and tallying them by type. - * - * To be in balance we need to make sure that each processor is only consuming - * their fair share. - * - * When the partitions are divvied up we will sometimes end up with some processors - * that will have 1 more partition than others. This can happen if the number of - * partitions is not evenly divisible by the number of processors. - * - * So this function largely exists to support _isLoadBalanced() and - * _shouldOwnMorePartitions(), both of which depend on knowing if our current list - * of processors is actually in the proper state. - * - * @param numPartitionsRequired The number of required partitions per processor. - * @param ownerPartitionMap The current ownerships for partitions. - */ - private _getProcessorCounts( - numPartitionsRequired: number, - ownerPartitionMap: Map - ): ProcessorCounts { - const counts: ProcessorCounts = { - haveRequiredPartitions: 0, - haveAdditionalPartition: 0, - haveTooManyPartitions: 0 - }; - - for (const ownershipList of ownerPartitionMap.values()) { - const numberOfPartitions = ownershipList.length; - - // there are basically three kinds of partition counts - // for a processor: - - // 1. Has _exactly_ the required number of partitions - if (numberOfPartitions === numPartitionsRequired) { - counts.haveRequiredPartitions++; - } - - // 2. Has the required number plus one extra (correct in cases) - // where the # of partitions is not evenly divisible by the - // number of processors. - if (numberOfPartitions === numPartitionsRequired + 1) { - counts.haveAdditionalPartition++; - } - - // 3. has more than the possible # of partitions required - if (numberOfPartitions > numPartitionsRequired + 1) { - counts.haveTooManyPartitions++; - } - } - - return counts; - } - - /* - * This method will create a new map of partition id and PartitionOwnership containing only those partitions - * that are actively owned. All entries in the original map returned by CheckpointStore that haven't been - * modified for a duration of time greater than the allowed inactivity time limit are assumed to be owned by - * dead event processors. These will not be included in the map returned by this method. - */ - private _removeInactivePartitionOwnerships( - partitionOwnershipMap: Map - ): Map { - const activePartitionOwnershipMap: Map = new Map(); - partitionOwnershipMap.forEach((partitionOwnership: PartitionOwnership, partitionId: string) => { - const date = new Date(); - if ( - partitionOwnership.lastModifiedTimeInMs && - date.getTime() - partitionOwnership.lastModifiedTimeInMs < this._inactiveTimeLimitInMS && - partitionOwnership.ownerId - ) { - activePartitionOwnershipMap.set(partitionId, partitionOwnership); - } - }); - - return activePartitionOwnershipMap; - } - - /* - * This method works with the given partition ownership details and Event Hub partitions to evaluate whether the - * current Event Processor should take on the responsibility of processing more partitions. - */ - loadBalance( - ourOwnerId: string, - partitionOwnershipMap: Map, - partitionsToAdd: string[] - ): string[] { - // Remove all partitions ownership that have not been modified within the configured period of time. This means that the previous - // event processor that owned the partition is probably down and the partition is now eligible to be - // claimed by other event processors. - const activePartitionOwnershipMap = this._removeInactivePartitionOwnerships( - partitionOwnershipMap - ); - logger.verbose( - `[${ourOwnerId}] Number of active ownership records: ${activePartitionOwnershipMap.size}.` - ); - if (activePartitionOwnershipMap.size === 0) { - // If the active partition ownership map is empty, this is the first time an event processor is - // running or all Event Processors are down for this Event Hub, consumer group combination. All - // partitions in this Event Hub are available to claim. Choose a random partition to claim ownership. - return [partitionsToAdd[Math.floor(Math.random() * partitionsToAdd.length)]]; - } - - // Create a map of owner id and a list of partitions it owns - const ownerPartitionMap: Map = new Map(); - for (const activePartitionOwnership of activePartitionOwnershipMap.values()) { - const partitionOwnershipArray = ownerPartitionMap.get(activePartitionOwnership.ownerId) || []; - partitionOwnershipArray.push(activePartitionOwnership); - ownerPartitionMap.set(activePartitionOwnership.ownerId, partitionOwnershipArray); - } - - // add the current event processor to the map if it doesn't exist - if (!ownerPartitionMap.has(ourOwnerId)) { - ownerPartitionMap.set(ourOwnerId, []); - } - logger.info(`[${ourOwnerId}] Number of active event processors: ${ownerPartitionMap.size}.`); - - // Include any partitions this entity already owns in the list of partitions to claim. - const partitionsToClaim = (ownerPartitionMap.get(ourOwnerId) || []).map( - (ownership) => ownership.partitionId - ); - - // Find the minimum number of partitions every event processor should own when the load is - // evenly distributed. - const minPartitionsPerEventProcessor = Math.floor( - partitionsToAdd.length / ownerPartitionMap.size - ); - // If the number of partitions in Event Hub is not evenly divisible by number of active event processors, - // a few Event Processors may own 1 additional partition than the minimum when the load is balanced. Calculate - // the number of event processors that can own an additional partition. - const requiredNumberOfEventProcessorsWithAdditionalPartition = - partitionsToAdd.length % ownerPartitionMap.size; - - logger.verbose( - `[${ourOwnerId}] Expected minimum number of partitions per event processor: ${minPartitionsPerEventProcessor}, - expected number of event processors with additional partition: ${requiredNumberOfEventProcessorsWithAdditionalPartition}.` - ); - - const processorCounts = this._getProcessorCounts( - minPartitionsPerEventProcessor, - ownerPartitionMap - ); - - if ( - this._isLoadBalanced( - requiredNumberOfEventProcessorsWithAdditionalPartition, - ownerPartitionMap.size, - processorCounts - ) - ) { - logger.info(`[${ourOwnerId}] Load is balanced.`); - // If the partitions are evenly distributed among all active event processors, no change required. - return partitionsToClaim; - } - - if ( - !this._shouldOwnMorePartitions( - minPartitionsPerEventProcessor, - requiredNumberOfEventProcessorsWithAdditionalPartition, - ownerPartitionMap.get(ourOwnerId)!.length, - processorCounts - ) - ) { - logger.verbose( - `[${ourOwnerId}] This event processor owns ${ - ownerPartitionMap.get(ourOwnerId)!.length - } partitions and shouldn't own more.` - ); - // This event processor already has enough partitions and shouldn't own more yet - return partitionsToClaim; - } - logger.info( - `[${ourOwnerId}] Load is unbalanced and this event processor should own more partitions.` - ); - // If we have reached this stage, this event processor has to claim/steal ownership of at least 1 more partition - - // If some partitions are unclaimed, this could be because an event processor is down and - // it's partitions are now available for others to own or because event processors are just - // starting up and gradually claiming partitions to own or new partitions were added to Event Hub. - // Find any partition that is not actively owned and claim it. - - // OR - - // Find a partition to steal from another event processor. Pick the event processor that owns the highest - // number of partitions. - const unOwnedPartitionIds = []; - - for (const partitionId of partitionsToAdd) { - if (!activePartitionOwnershipMap.has(partitionId)) { - unOwnedPartitionIds.push(partitionId); - } - } - if (unOwnedPartitionIds.length === 0) { - logger.info( - `[${ourOwnerId}] No unclaimed partitions, stealing from another event processor.` - ); - partitionsToClaim.push(this._findPartitionToSteal(ourOwnerId, ownerPartitionMap)); - } else { - partitionsToClaim.push( - unOwnedPartitionIds[Math.floor(Math.random() * unOwnedPartitionIds.length)] - ); - } - - return partitionsToClaim; - } -} - -/** - * Counts of the processors that currently own partitions. - */ -interface ProcessorCounts { - /** - * The # of processors that only own the required # of - * partitions. - */ - haveRequiredPartitions: number; - /** - * The # of processors that currently own the required # - * of partitions + 1 additional (ie, handling the case where - * the number of partitions is not evenly divisible by the # of - * processors). - */ - haveAdditionalPartition: number; - /** - * Processors which have more than the required or even required + 1 - * number of partitions. These will eventually be downsized by other - * processors as they acquire their required number of partitions. - */ - haveTooManyPartitions: number; -} diff --git a/sdk/eventhub/event-hubs/src/util/constants.ts b/sdk/eventhub/event-hubs/src/util/constants.ts index e9da46faef1f..96a3fb178469 100644 --- a/sdk/eventhub/event-hubs/src/util/constants.ts +++ b/sdk/eventhub/event-hubs/src/util/constants.ts @@ -6,5 +6,5 @@ */ export const packageJsonInfo = { name: "@azure/event-hubs", - version: "5.2.2" + version: "5.3.0-preview.1" }; diff --git a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts index 5b990694be87..74eee6afa19c 100644 --- a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts @@ -19,6 +19,8 @@ import { InMemoryCheckpointStore } from "../src/inMemoryCheckpointStore"; import { EventProcessor, FullEventProcessorOptions } from "../src/eventProcessor"; import { SinonStubbedInstance, createStubInstance } from "sinon"; import { ConnectionContext } from "../src/connectionContext"; +import { BalancedLoadBalancingStrategy } from "../src/loadBalancerStrategies/balancedStrategy"; +import { GreedyLoadBalancingStrategy } from "../src/loadBalancerStrategies/greedyStrategy"; const should = chai.should(); const env = getEnvVars(); @@ -57,6 +59,20 @@ describe("EventHubConsumerClient", () => { let clientWithCheckpointStore: EventHubConsumerClient; let subscriptionHandlers: SubscriptionEventHandlers; let fakeEventProcessor: SinonStubbedInstance; + const fakeEventProcessorConstructor = ( + connectionContext: ConnectionContext, + subscriptionEventHandlers: SubscriptionEventHandlers, + checkpointStore: CheckpointStore, + options: FullEventProcessorOptions + ) => { + subscriptionEventHandlers.should.equal(subscriptionHandlers); + should.exist(connectionContext.managementSession); + isCheckpointStore(checkpointStore).should.be.ok; + + validateOptions(options); + + return fakeEventProcessor; + }; let validateOptions: (options: FullEventProcessorOptions) => void; @@ -82,21 +98,6 @@ describe("EventHubConsumerClient", () => { processError: async () => {} }; - const fakeEventProcessorConstructor = ( - connectionContext: ConnectionContext, - subscriptionEventHandlers: SubscriptionEventHandlers, - checkpointStore: CheckpointStore, - options: FullEventProcessorOptions - ) => { - subscriptionEventHandlers.should.equal(subscriptionHandlers); - should.exist(connectionContext.managementSession); - isCheckpointStore(checkpointStore).should.be.ok; - - validateOptions(options); - - return fakeEventProcessor; - }; - (client as any)["_createEventProcessor"] = fakeEventProcessorConstructor; (clientWithCheckpointStore as any)["_createEventProcessor"] = fakeEventProcessorConstructor; }); @@ -119,59 +120,335 @@ describe("EventHubConsumerClient", () => { ); }); - it("subscribe to single partition, no checkpoint store", () => { + it("subscribe to single partition, no checkpoint store, no loadBalancingOptions", () => { validateOptions = (options) => { // when the user doesn't pass a checkpoint store we give them a really simple set of - // defaults: InMemoryCheckpointStore and the GreedyLoadBalancer. + // defaults: + // - InMemoryCheckpointStore + // - UnbalancedLoadBalancingStrategy + // - loopIntervalInMs: 10000 // So we don't set an ownerlevel here - it's all in-memory and you can have as many // as you want (the user still has the option to pass their own via SubscribeOptions). should.not.exist(options.ownerLevel); // and if you don't specify a CheckpointStore we also assume you just want to read all partitions - // immediately so we bypass the FairPartitionLoadBalancer entirely - options.processingTarget!.constructor.name.should.equal("GreedyPartitionLoadBalancer"); + // immediately so we use the UnbalancedLoadBalancingStrategy. + options.loadBalancingStrategy.constructor.name.should.equal( + "UnbalancedLoadBalancingStrategy" + ); + + options.loopIntervalInMs.should.equal(10000); + options.processingTarget!.should.equal("0"); }; - const subscription = client.subscribe(subscriptionHandlers); + const subscription = client.subscribe("0", subscriptionHandlers); subscription.close(); fakeEventProcessor.stop.callCount.should.equal(1); }); - it("subscribe to single partition, WITH checkpoint store", () => { + it("subscribe to single partition, no checkpoint store, WITH loadBalancingOptions", () => { validateOptions = (options) => { - // when the user gives us a checkpoint store we treat their consumer client as - // a "production" ready client - they use their checkpoint store and - // the FairPartitionLoadBalancer + // When the user subscribes to a single partition, we always use the UnbalancedLoadBalancingStrategy. + // The loadBalancingOptions `strategy` and `partitionOwnershipExpirationIntervalInMs` fields are ignored. + // - InMemoryCheckpointStore + // - UnbalancedLoadBalancingStrategy + // - loopIntervalInMs: 10000 + + // So we don't set an ownerlevel here - it's all in-memory and you can have as many + // as you want (the user still has the option to pass their own via SubscribeOptions). + should.not.exist(options.ownerLevel); + + // and if you don't specify a CheckpointStore we also assume you just want to read all partitions + // immediately so we use the UnbalancedLoadBalancingStrategy. + options.loadBalancingStrategy.constructor.name.should.equal( + "UnbalancedLoadBalancingStrategy" + ); + + options.loopIntervalInMs.should.equal(20); + options.processingTarget!.should.equal("0"); + }; + + client = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString!, + service.path, + { + loadBalancingOptions: { + strategy: "greedy", // ignored + partitionOwnershipExpirationIntervalInMs: 100, // ignored + updateIntervalInMs: 20 + } + } + ); + (client as any)["_createEventProcessor"] = fakeEventProcessorConstructor; + + const subscription = client.subscribe("0", subscriptionHandlers); + + subscription.close(); + fakeEventProcessor.stop.callCount.should.equal(1); + }); + + it("subscribe to single partition, WITH checkpoint store, no loadBalancingOptions", () => { + validateOptions = (options) => { + // when the user gives us a checkpoint store but subscribes to a single partition, + // - they use their checkpoint store and the following defaults: + // - UnbalancedLoadBalancingStrategy + // - loopIntervalInMs: 10000 // To coordinate properly we set an owner level - this lets us // cooperate properly with other consumers within this group. options.ownerLevel!.should.equal(0); - // We're falling back to the actual production load balancer - // (which means we just don't override the partition load balancer field) - should.not.exist(options.processingTarget); + options.processingTarget!.should.equal("0"); + options.loadBalancingStrategy.constructor.name.should.equal( + "UnbalancedLoadBalancingStrategy" + ); + options.loopIntervalInMs.should.equal(10000); }; - clientWithCheckpointStore.subscribe(subscriptionHandlers); + clientWithCheckpointStore.subscribe("0", subscriptionHandlers); + }); + + it("subscribe to single partition, WITH checkpoint store, WITH loadBalancingOptions", () => { + validateOptions = (options) => { + // When the user subscribes to a single partition, we always use the UnbalancedLoadBalancingStrategy. + // The loadBalancingOptions `strategy` and `partitionOwnershipExpirationIntervalInMs` fields are ignored. + // - UnbalancedLoadBalancingStrategy + // - loopIntervalInMs: 10000 + + // To coordinate properly we set an owner level - this lets us + // cooperate properly with other consumers within this group. + options.ownerLevel!.should.equal(0); + + options.processingTarget!.should.equal("0"); + options.loadBalancingStrategy.constructor.name.should.equal( + "UnbalancedLoadBalancingStrategy" + ); + options.loopIntervalInMs.should.equal(20); + }; + + clientWithCheckpointStore = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString!, + service.path, + // it doesn't actually matter _what_ checkpoint store gets passed in + new InMemoryCheckpointStore(), + { + loadBalancingOptions: { + strategy: "greedy", // ignored + partitionOwnershipExpirationIntervalInMs: 100, // ignored + updateIntervalInMs: 20 + } + } + ); + (clientWithCheckpointStore as any)["_createEventProcessor"] = fakeEventProcessorConstructor; + + clientWithCheckpointStore.subscribe("0", subscriptionHandlers); }); - it("subscribe to all partitions, no checkpoint store", () => { + it("subscribe to all partitions, no checkpoint store, no loadBalancingOptions", () => { validateOptions = (options) => { + // when the user doesn't pass a checkpoint store we give them a really simple set of + // defaults: + // - InMemoryCheckpointStore + // - UnbalancedLoadBalancingStrategy + // - loopIntervalInMs: 10000 should.not.exist(options.ownerLevel); - options.processingTarget!.constructor.name.should.equal("GreedyPartitionLoadBalancer"); + options.loadBalancingStrategy.constructor.name.should.equal( + "UnbalancedLoadBalancingStrategy" + ); + options.loopIntervalInMs.should.equal(10000); }; client.subscribe(subscriptionHandlers); }); - it("subscribe to all partitions, WITH checkpoint store", () => { + it("subscribe to all partitions, no checkpoint store, WITH loadBalancingOptions", () => { + validateOptions = (options) => { + // When the user doesn't provide a checkpoint store, we always use the UnbalancedLoadBalancingStrategy. + // The loadBalancingOptions `strategy` and `partitionOwnershipExpirationIntervalInMs` fields are ignored. + // - InMemoryCheckpointStore + // - UnbalancedLoadBalancingStrategy + should.not.exist(options.ownerLevel); + options.loadBalancingStrategy.constructor.name.should.equal( + "UnbalancedLoadBalancingStrategy" + ); + options.loopIntervalInMs.should.equal(20); + }; + + client = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString!, + service.path, + { + loadBalancingOptions: { + strategy: "greedy", // ignored + partitionOwnershipExpirationIntervalInMs: 100, // ignored + updateIntervalInMs: 20 + } + } + ); + (client as any)["_createEventProcessor"] = fakeEventProcessorConstructor; + + client.subscribe(subscriptionHandlers); + }); + + it("subscribe to all partitions, WITH checkpoint store, no loadBalancingOptions", () => { + validateOptions = (options) => { + // when the user gives us a checkpoint store we treat their consumer client as + // a "production" ready client - they use their checkpoint store and the following + // defaults: + // - BalancedLoadBalancingStrategy + // - loopIntervalInMs: 10000 + // - partitionOwnershipExpirationIntervalInMs: 60000 + options.ownerLevel!.should.equal(0); + should.not.exist(options.processingTarget); + options.loadBalancingStrategy.constructor.name.should.equal( + "BalancedLoadBalancingStrategy" + ); + (options.loadBalancingStrategy as BalancedLoadBalancingStrategy)[ + "_partitionOwnershipExpirationIntervalInMs" + ].should.equal(60000); + options.loopIntervalInMs.should.equal(10000); + }; + + clientWithCheckpointStore.subscribe(subscriptionHandlers); + }); + + it("subscribe to all partitions, WITH checkpoint store, WITH loadBalancingOptions (greedy, updateInterval, expirationInterval)", () => { + validateOptions = (options) => { + // when the user gives us a checkpoint store and subscribes to all partitions, + // we use their loadBalancingOptions when provided. + options.ownerLevel!.should.equal(0); + should.not.exist(options.processingTarget); + options.loadBalancingStrategy.constructor.name.should.equal( + "GreedyLoadBalancingStrategy" + ); + (options.loadBalancingStrategy as GreedyLoadBalancingStrategy)[ + "_partitionOwnershipExpirationIntervalInMs" + ].should.equal(100); + options.loopIntervalInMs.should.equal(20); + }; + + clientWithCheckpointStore = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString!, + service.path, + // it doesn't actually matter _what_ checkpoint store gets passed in + new InMemoryCheckpointStore(), + { + loadBalancingOptions: { + strategy: "greedy", + partitionOwnershipExpirationIntervalInMs: 100, + updateIntervalInMs: 20 + } + } + ); + (clientWithCheckpointStore as any)["_createEventProcessor"] = fakeEventProcessorConstructor; + + clientWithCheckpointStore.subscribe(subscriptionHandlers); + }); + + it("subscribe to all partitions, WITH checkpoint store, WITH loadBalancingOptions (balanced, updateInterval, expirationInterval)", () => { + validateOptions = (options) => { + // when the user gives us a checkpoint store and subscribes to all partitions, + // we use their loadBalancingOptions when provided. + options.ownerLevel!.should.equal(0); + should.not.exist(options.processingTarget); + options.loadBalancingStrategy.constructor.name.should.equal( + "BalancedLoadBalancingStrategy" + ); + (options.loadBalancingStrategy as BalancedLoadBalancingStrategy)[ + "_partitionOwnershipExpirationIntervalInMs" + ].should.equal(100); + options.loopIntervalInMs.should.equal(20); + }; + + clientWithCheckpointStore = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString!, + service.path, + // it doesn't actually matter _what_ checkpoint store gets passed in + new InMemoryCheckpointStore(), + { + loadBalancingOptions: { + strategy: "balanced", + partitionOwnershipExpirationIntervalInMs: 100, + updateIntervalInMs: 20 + } + } + ); + (clientWithCheckpointStore as any)["_createEventProcessor"] = fakeEventProcessorConstructor; + + clientWithCheckpointStore.subscribe(subscriptionHandlers); + }); + + it("subscribe to all partitions, WITH checkpoint store, WITH loadBalancingOptions (updateInterval, expirationInterval)", () => { + validateOptions = (options) => { + // when the user gives us a checkpoint store and subscribes to all partitions, + // we use their loadBalancingOptions when provided. + options.ownerLevel!.should.equal(0); + should.not.exist(options.processingTarget); + options.loadBalancingStrategy.constructor.name.should.equal( + "BalancedLoadBalancingStrategy" + ); + (options.loadBalancingStrategy as BalancedLoadBalancingStrategy)[ + "_partitionOwnershipExpirationIntervalInMs" + ].should.equal(100); + options.loopIntervalInMs.should.equal(20); + }; + + clientWithCheckpointStore = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString!, + service.path, + // it doesn't actually matter _what_ checkpoint store gets passed in + new InMemoryCheckpointStore(), + { + loadBalancingOptions: { + // default 'strategy' is 'balanced' + partitionOwnershipExpirationIntervalInMs: 100, + updateIntervalInMs: 20 + } + } + ); + (clientWithCheckpointStore as any)["_createEventProcessor"] = fakeEventProcessorConstructor; + + clientWithCheckpointStore.subscribe(subscriptionHandlers); + }); + + it("subscribe to all partitions, WITH checkpoint store, WITH loadBalancingOptions (strategy)", () => { validateOptions = (options) => { + // when the user gives us a checkpoint store and subscribes to all partitions, + // we use their loadBalancingOptions when provided. options.ownerLevel!.should.equal(0); should.not.exist(options.processingTarget); + options.loadBalancingStrategy.constructor.name.should.equal( + "GreedyLoadBalancingStrategy" + ); + (options.loadBalancingStrategy as GreedyLoadBalancingStrategy)[ + "_partitionOwnershipExpirationIntervalInMs" + ].should.equal(60000); + options.loopIntervalInMs.should.equal(10000); }; + clientWithCheckpointStore = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString!, + service.path, + // it doesn't actually matter _what_ checkpoint store gets passed in + new InMemoryCheckpointStore(), + { + loadBalancingOptions: { + strategy: "greedy" + // defaults are used for the rest of the parameters. + } + } + ); + (clientWithCheckpointStore as any)["_createEventProcessor"] = fakeEventProcessorConstructor; + clientWithCheckpointStore.subscribe(subscriptionHandlers); }); @@ -575,10 +852,7 @@ describe("EventHubConsumerClient", () => { it("Receive from all partitions, no coordination", async function(): Promise { const logTester = new LogTester( - [ - "EventHubConsumerClient subscribing to all partitions, no checkpoint store.", - "GreedyPartitionLoadBalancer created. Watching all." - ], + ["EventHubConsumerClient subscribing to all partitions, no checkpoint store."], [ logger.verbose as debug.Debugger, logger.verbose as debug.Debugger, @@ -647,7 +921,7 @@ describe("EventHubConsumerClient", () => { logTester.assert(); }); - it("Receive from all partitions, coordinating with the same partition manager and using the FairPartitionLoadBalancer", async function(): Promise< + it("Receive from all partitions, coordinating with the same partition manager and using the default LoadBalancingStrategy", async function(): Promise< void > { // fast forward our partition manager so it starts reading from the latest offset @@ -665,14 +939,16 @@ describe("EventHubConsumerClient", () => { ] ); + const checkpointStore = new InMemoryCheckpointStore(); + clients.push( new EventHubConsumerClient( EventHubConsumerClient.defaultConsumerGroupName, service.connectionString!, service.path, // specifying your own checkpoint store activates the "production ready" code path that - // also uses the FairPartitionLoadBalancer - new InMemoryCheckpointStore() + // also uses the BalancedLoadBalancingStrategy + checkpointStore ) ); @@ -689,8 +965,79 @@ describe("EventHubConsumerClient", () => { service.connectionString!, service.path, // specifying your own checkpoint store activates the "production ready" code path that - // also uses the FairPartitionLoadBalancer - new InMemoryCheckpointStore() + // also uses the BalancedLoadBalancingStrategy + checkpointStore + ) + ); + + const subscriber2 = clients[1].subscribe(tester, { + startPosition: latestEventPosition + }); + subscriptions.push(subscriber2); + + await tester.runTestAndPoll(producerClient); + + // or else we won't see the abandoning message + for (const subscription of subscriptions) { + await subscription.close(); + } + logTester.assert(); + }); + + it("Receive from all partitions, coordinating with the same partition manager and using the GreedyLoadBalancingStrategy", async function(): Promise< + void + > { + // fast forward our partition manager so it starts reading from the latest offset + // instead of the beginning of time. + const logTester = new LogTester( + [ + "EventHubConsumerClient subscribing to all partitions, using a checkpoint store.", + /Starting event processor with ID /, + "Abandoning owned partitions" + ], + [ + logger.verbose as debug.Debugger, + logger.verbose as debug.Debugger, + logger.verbose as debug.Debugger + ] + ); + + const checkpointStore = new InMemoryCheckpointStore(); + + clients.push( + new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString!, + service.path, + // specifying your own checkpoint store activates the "production ready" code path that + checkpointStore, + { + loadBalancingOptions: { + strategy: "greedy" + } + } + ) + ); + + const tester = new ReceivedMessagesTester(partitionIds, true); + + const subscriber1 = clients[0].subscribe(tester, { + startPosition: latestEventPosition + }); + subscriptions.push(subscriber1); + + clients.push( + new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString!, + service.path, + // specifying your own checkpoint store activates the "production ready" code path that + checkpointStore, + { + loadBalancingOptions: { + strategy: "greedy" + } + } ) ); diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 6fbe6112787e..b7ee6c80f8bb 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -31,18 +31,22 @@ import { SubscriptionHandlerForTests, sendOneMessagePerPartition } from "./utils/subscriptionHandlerForTests"; -import { GreedyPartitionLoadBalancer, PartitionLoadBalancer } from "../src/partitionLoadBalancer"; import { AbortError, AbortSignal } from "@azure/abort-controller"; import { FakeSubscriptionEventHandlers } from "./utils/fakeSubscriptionEventHandlers"; import { isLatestPosition } from "../src/eventPosition"; import { AbortController } from "@azure/abort-controller"; +import { UnbalancedLoadBalancingStrategy } from "../src/loadBalancerStrategies/unbalancedStrategy"; +import { BalancedLoadBalancingStrategy } from "../src/loadBalancerStrategies/balancedStrategy"; +import { GreedyLoadBalancingStrategy } from "../src/loadBalancerStrategies/greedyStrategy"; const env = getEnvVars(); describe("Event Processor", function(): void { const defaultOptions: FullEventProcessorOptions = { maxBatchSize: 1, maxWaitTimeInSeconds: 60, - ownerLevel: 0 + ownerLevel: 0, + loopIntervalInMs: 10000, + loadBalancingStrategy: new UnbalancedLoadBalancingStrategy() }; const service = { @@ -74,6 +78,7 @@ describe("Event Processor", function(): void { afterEach("close the connection", async function(): Promise { await producerClient.close(); + await consumerClient.close(); }); describe("unit tests", () => { @@ -171,7 +176,9 @@ describe("Event Processor", function(): void { { startPosition, maxBatchSize: 1, - maxWaitTimeInSeconds: 1 + maxWaitTimeInSeconds: 1, + loadBalancingStrategy: defaultOptions.loadBalancingStrategy, + loopIntervalInMs: defaultOptions.loopIntervalInMs } ); } @@ -294,6 +301,10 @@ describe("Event Processor", function(): void { isReceivingFromPartition() { return false; + }, + + receivingFromPartitions() { + return []; } }; @@ -381,7 +392,8 @@ describe("Event Processor", function(): void { isReceivingFromPartition() { return false; } - } + }, + loadBalancingStrategy: new BalancedLoadBalancingStrategy(60000) } ); @@ -391,10 +403,16 @@ describe("Event Processor", function(): void { // we'll let one more go through just to make sure we're not going to // pick up an extra surprise partition // - // This particular behavior is really specific to the FairPartitionLoadBalancer but that's okay for now. - const numTimesAbortedIsCheckedInLoop = 4; + // There are 6 places where the abort signal is checked during the loop: + // - while condition + // - getEventHubProperties + // - _performLoadBalancing (start) + // - _performLoadBalancing (after listOwnership) + // - _performLoadBalancing (passed to _claimOwnership) + // - delay + const numTimesAbortedIsCheckedInLoop = 6; await ep["_runLoopWithLoadBalancing"]( - ep["_processingTarget"] as PartitionLoadBalancer, + ep["_loadBalancingStrategy"], triggerAbortedSignalAfterNumCalls(partitionIds.length * numTimesAbortedIsCheckedInLoop) ); @@ -480,6 +498,7 @@ describe("Event Processor", function(): void { it("claimOwnership throws and is reported to the user", async () => { const errors = []; + const partitionIds = await consumerClient.getPartitionIds(); const faultyCheckpointStore: CheckpointStore = { listOwnership: async () => [], @@ -501,8 +520,7 @@ describe("Event Processor", function(): void { }, faultyCheckpointStore, { - ...defaultOptions, - processingTarget: new GreedyPartitionLoadBalancer(["0"]) + ...defaultOptions } ); @@ -518,7 +536,7 @@ describe("Event Processor", function(): void { until: async () => errors.length !== 0 }); - errors.length.should.equal(1); + errors.length.should.equal(partitionIds.length); } finally { // this will also fail - we "abandon" all claimed partitions at // when a processor is stopped (which requires us to claim them @@ -533,19 +551,33 @@ describe("Event Processor", function(): void { it("errors thrown from the user's handlers are reported to processError()", async () => { const errors = new Set(); + const partitionIds = await consumerClient.getPartitionIds(); + + const processCloseErrorMessage = "processClose() error"; + const processEventsErrorMessage = "processEvents() error"; + const processInitializeErrorMessage = "processInitialize() error"; + const expectedErrorMessages: string[] = []; + for (let i = 0; i < partitionIds.length; i++) { + expectedErrorMessages.push( + processCloseErrorMessage, + processEventsErrorMessage, + processInitializeErrorMessage + ); + } + expectedErrorMessages.sort(); const eventProcessor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { processClose: async () => { - throw new Error("processClose() error"); + throw new Error(processCloseErrorMessage); }, processEvents: async () => { - throw new Error("processEvents() error"); + throw new Error(processEventsErrorMessage); }, processInitialize: async () => { - throw new Error("processInitialize() error"); + throw new Error(processInitializeErrorMessage); }, processError: async (err, _) => { errors.add(err); @@ -555,7 +587,6 @@ describe("Event Processor", function(): void { new InMemoryCheckpointStore(), { ...defaultOptions, - processingTarget: new GreedyPartitionLoadBalancer(["0"]), startPosition: earliestEventPosition } ); @@ -569,17 +600,13 @@ describe("Event Processor", function(): void { name: "waiting for errors thrown from user's handlers", timeBetweenRunsMs: 1000, maxTimes: 30, - until: async () => errors.size >= 3 + until: async () => errors.size >= partitionIds.length * 3 }); const messages = [...errors].map((e) => e.message); messages.sort(); - messages.should.deep.equal([ - "processClose() error", - "processEvents() error", - "processInitialize() error" - ]); + messages.should.deep.equal(expectedErrorMessages); } finally { await eventProcessor.stop(); } @@ -637,7 +664,6 @@ describe("Event Processor", function(): void { new InMemoryCheckpointStore(), { ...defaultOptions, - processingTarget: new GreedyPartitionLoadBalancer(), startPosition: startPosition } ); @@ -694,7 +720,6 @@ describe("Event Processor", function(): void { subscriptionEventHandler, startPosition } = await SubscriptionHandlerForTests.startingFromHere(producerClient); - const partitionLoadBalancer = new GreedyPartitionLoadBalancer(); const processor = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, @@ -702,7 +727,6 @@ describe("Event Processor", function(): void { subscriptionEventHandler, new InMemoryCheckpointStore(), { - processingTarget: partitionLoadBalancer, ...defaultOptions, startPosition: startPosition } @@ -756,7 +780,6 @@ describe("Event Processor", function(): void { new InMemoryCheckpointStore(), { ...defaultOptions, - processingTarget: new GreedyPartitionLoadBalancer(), startPosition: startPosition } ); @@ -1059,7 +1082,164 @@ describe("Event Processor", function(): void { ); }); - it("should 'steal' partitions until all the processors have reached a steady-state", async function(): Promise< + it("should 'steal' partitions until all the processors have reached a steady-state (BalancedLoadBalancingStrategy)", async function(): Promise< + void + > { + loggerForTest("starting up the stealing test"); + + const processorByName: Dictionary = {}; + const checkpointStore = new InMemoryCheckpointStore(); + const partitionIds = await producerClient.getPartitionIds(); + const partitionOwnershipArr = new Set(); + + const partitionResultsMap = new Map< + string, + { events: string[]; initialized: boolean; closeReason?: CloseReason } + >(); + partitionIds.forEach((id) => partitionResultsMap.set(id, { events: [], initialized: false })); + let didGetReceiverDisconnectedError = false; + + // The partitionProcess will need to add events to the partitionResultsMap as they are received + class FooPartitionProcessor implements Required { + async processInitialize(context: PartitionContext) { + loggerForTest(`processInitialize(${context.partitionId})`); + partitionResultsMap.get(context.partitionId)!.initialized = true; + } + async processClose(reason: CloseReason, context: PartitionContext) { + loggerForTest(`processClose(${context.partitionId})`); + partitionResultsMap.get(context.partitionId)!.closeReason = reason; + } + async processEvents(events: ReceivedEventData[], context: PartitionContext) { + partitionOwnershipArr.add(context.partitionId); + const existingEvents = partitionResultsMap.get(context.partitionId)!.events; + existingEvents.push(...events.map((event) => event.body)); + } + async processError(err: Error, context: PartitionContext) { + loggerForTest(`processError(${context.partitionId})`); + const errorName = (err as any).code; + if (errorName === "ReceiverDisconnectedError") { + didGetReceiverDisconnectedError = true; + } + } + } + + // create messages + const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; + for (const partitionId of partitionIds) { + await producerClient.sendBatch([{ body: expectedMessagePrefix + partitionId }], { + partitionId + }); + } + + const processor1LoadBalancingInterval = { + loopIntervalInMs: 1000 + }; + + // working around a potential deadlock - this allows `processor-2` to more + // aggressively pursue getting its required partitions and avoid being in + // lockstep with `processor-1` + const processor2LoadBalancingInterval = { + loopIntervalInMs: processor1LoadBalancingInterval.loopIntervalInMs / 2 + }; + + processorByName[`processor-1`] = new EventProcessor( + EventHubConsumerClient.defaultConsumerGroupName, + consumerClient["_context"], + new FooPartitionProcessor(), + checkpointStore, + { + ...defaultOptions, + startPosition: earliestEventPosition, + ...processor1LoadBalancingInterval, + loadBalancingStrategy: new BalancedLoadBalancingStrategy(60000) + } + ); + + processorByName[`processor-1`].start(); + + await loopUntil({ + name: "All partitions are owned", + maxTimes: 60, + timeBetweenRunsMs: 1000, + until: async () => partitionOwnershipArr.size === partitionIds.length, + errorMessageFn: () => `${partitionOwnershipArr.size}/${partitionIds.length}` + }); + + processorByName[`processor-2`] = new EventProcessor( + EventHubConsumerClient.defaultConsumerGroupName, + consumerClient["_context"], + new FooPartitionProcessor(), + checkpointStore, + { + ...defaultOptions, + startPosition: earliestEventPosition, + ...processor2LoadBalancingInterval, + loadBalancingStrategy: new BalancedLoadBalancingStrategy(60000) + } + ); + + partitionOwnershipArr.size.should.equal(partitionIds.length); + processorByName[`processor-2`].start(); + + await loopUntil({ + name: "Processors are balanced", + maxTimes: 60, + timeBetweenRunsMs: 1000, + until: async () => { + // it should be impossible for 'processor-2' to have obtained the number of + // partitions it needed without having stolen some from 'processor-1' + // so if we haven't see any `ReceiverDisconnectedError`'s then that stealing + // hasn't occurred yet. + if (!didGetReceiverDisconnectedError) { + return false; + } + + const partitionOwnership = await checkpointStore.listOwnership( + consumerClient.fullyQualifiedNamespace, + consumerClient.eventHubName, + EventHubConsumerClient.defaultConsumerGroupName + ); + + // map of ownerId as a key and partitionIds as a value + const partitionOwnershipMap: Map = ownershipListToMap( + partitionOwnership + ); + + // if stealing has occurred we just want to make sure that _all_ + // the stealing has completed. + const isBalanced = (friendlyName: string) => { + const n = Math.floor(partitionIds.length / 2); + const numPartitions = partitionOwnershipMap.get(processorByName[friendlyName].id)! + .length; + return numPartitions == n || numPartitions == n + 1; + }; + + if (!isBalanced(`processor-1`) || !isBalanced(`processor-2`)) { + return false; + } + + return true; + } + }); + + for (const processor in processorByName) { + await processorByName[processor].stop(); + } + + // now that all the dust has settled let's make sure that + // a. we received some events from each partition (doesn't matter which processor) + // did the work + // b. each partition was initialized + // c. each partition should have received at least one shutdown event + for (const partitionId of partitionIds) { + const results = partitionResultsMap.get(partitionId)!; + results.events.length.should.be.gte(1); + results.initialized.should.be.true; + (results.closeReason === CloseReason.Shutdown).should.be.true; + } + }); + + it("should 'steal' partitions until all the processors have reached a steady-state (GreedyLoadBalancingStrategy)", async function(): Promise< void > { loggerForTest("starting up the stealing test"); @@ -1127,7 +1307,8 @@ describe("Event Processor", function(): void { { ...defaultOptions, startPosition: earliestEventPosition, - ...processor1LoadBalancingInterval + ...processor1LoadBalancingInterval, + loadBalancingStrategy: new GreedyLoadBalancingStrategy(60000) } ); @@ -1149,7 +1330,8 @@ describe("Event Processor", function(): void { { ...defaultOptions, startPosition: earliestEventPosition, - ...processor2LoadBalancingInterval + ...processor2LoadBalancingInterval, + loadBalancingStrategy: new GreedyLoadBalancingStrategy(60000) } ); @@ -1214,7 +1396,7 @@ describe("Event Processor", function(): void { } }); - it("should ensure that all the processors reach a steady-state where all partitions are being processed", async function(): Promise< + it("should ensure that all the processors reach a steady-state where all partitions are being processed (BalancedLoadBalancingStrategy)", async function(): Promise< void > { const processorByName: Dictionary = {}; @@ -1243,14 +1425,16 @@ describe("Event Processor", function(): void { for (let i = 0; i < 2; i++) { const processorName = `processor-${i}`; - processorByName[ - processorName - ] = new EventProcessor( + processorByName[processorName] = new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], new FooPartitionProcessor(), checkpointStore, - { ...defaultOptions, startPosition: earliestEventPosition } + { + ...defaultOptions, + startPosition: earliestEventPosition, + loadBalancingStrategy: new BalancedLoadBalancingStrategy(60000) + } ); processorByName[processorName].start(); await delay(12000); @@ -1293,7 +1477,84 @@ describe("Event Processor", function(): void { partitionOwnershipMap.get(processorByName[`processor-1`].id)!.length.should.oneOf([n, n + 1]); }); - it("should ensure that all the processors maintain a steady-state when all partitions are being processed", async function(): Promise< + it("should ensure that all the processors reach a steady-state where all partitions are being processed (GreedyLoadBalancingStrategy)", async function(): Promise< + void + > { + const processorByName: Dictionary = {}; + const partitionIds = await producerClient.getPartitionIds(); + const checkpointStore = new InMemoryCheckpointStore(); + const partitionOwnershipArr = new Set(); + + // The partitionProcess will need to add events to the partitionResultsMap as they are received + class FooPartitionProcessor { + async processEvents(_events: ReceivedEventData[], context: PartitionContext) { + partitionOwnershipArr.add(context.partitionId); + } + async processError() {} + } + + // create messages + const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; + for (const partitionId of partitionIds) { + await producerClient.sendBatch([{ body: expectedMessagePrefix + partitionId }], { + partitionId + }); + } + + for (let i = 0; i < 2; i++) { + const processorName = `processor-${i}`; + processorByName[processorName] = new EventProcessor( + EventHubConsumerClient.defaultConsumerGroupName, + consumerClient["_context"], + new FooPartitionProcessor(), + checkpointStore, + { + ...defaultOptions, + startPosition: earliestEventPosition, + loadBalancingStrategy: new GreedyLoadBalancingStrategy(60000) + } + ); + processorByName[processorName].start(); + await delay(12000); + } + + await loopUntil({ + name: "partitionownership", + timeBetweenRunsMs: 5000, + maxTimes: 10, + until: async () => partitionOwnershipArr.size === partitionIds.length + }); + + // map of ownerId as a key and partitionIds as a value + const partitionOwnershipMap: Map = new Map(); + + const partitionOwnership = await checkpointStore.listOwnership( + consumerClient.fullyQualifiedNamespace, + consumerClient.eventHubName, + EventHubConsumerClient.defaultConsumerGroupName + ); + + partitionOwnershipArr.size.should.equal(partitionIds.length); + for (const processor in processorByName) { + await processorByName[processor].stop(); + } + + for (const ownership of partitionOwnership) { + if (!partitionOwnershipMap.has(ownership.ownerId)) { + partitionOwnershipMap.set(ownership.ownerId, [ownership.partitionId]); + } else { + const arr = partitionOwnershipMap.get(ownership.ownerId); + arr!.push(ownership.partitionId); + partitionOwnershipMap.set(ownership.ownerId, arr!); + } + } + + const n = Math.floor(partitionIds.length / 2); + partitionOwnershipMap.get(processorByName[`processor-0`].id)!.length.should.oneOf([n, n + 1]); + partitionOwnershipMap.get(processorByName[`processor-1`].id)!.length.should.oneOf([n, n + 1]); + }); + + it("should ensure that all the processors maintain a steady-state when all partitions are being processed (BalancedLoadBalancingStrategy)", async function(): Promise< void > { const partitionIds = await producerClient.getPartitionIds(); @@ -1349,7 +1610,171 @@ describe("Event Processor", function(): void { inactiveTimeLimitInMs: 3000, ownerLevel: 0, // For this test we don't want to actually checkpoint, just test ownership. - startPosition: latestEventPosition + startPosition: latestEventPosition, + loadBalancingStrategy: new BalancedLoadBalancingStrategy(60000) + }; + + const processor1 = new EventProcessor( + EventHubConsumerClient.defaultConsumerGroupName, + consumerClient["_context"], + handlers, + checkpointStore, + eventProcessorOptions + ); + + const processor2 = new EventProcessor( + EventHubConsumerClient.defaultConsumerGroupName, + consumerClient["_context"], + handlers, + checkpointStore, + eventProcessorOptions + ); + + processor1.start(); + processor2.start(); + + // loop until all partitions are claimed + try { + let lastLoopError: Record = {}; + + await loopUntil({ + name: "partitionOwnership", + maxTimes: 30, + timeBetweenRunsMs: 10000, + + errorMessageFn: () => JSON.stringify(lastLoopError, undefined, " "), + until: async () => { + // Ensure the partition ownerships are balanced. + const eventProcessorIds = Object.keys(claimedPartitionsMap); + + // There are 2 processors, so we should see 2 entries. + if (eventProcessorIds.length !== 2) { + lastLoopError = { + reason: "Not all event processors have shown up", + eventProcessorIds, + partitionOwnershipHistory + }; + return false; + } + + const aProcessorPartitions = claimedPartitionsMap[eventProcessorIds[0]]; + const bProcessorPartitions = claimedPartitionsMap[eventProcessorIds[1]]; + + // The delta between number of partitions each processor owns can't be more than 1. + if (Math.abs(aProcessorPartitions.size - bProcessorPartitions.size) > 1) { + lastLoopError = { + reason: "Delta between partitions is greater than 1", + a: Array.from(aProcessorPartitions), + b: Array.from(bProcessorPartitions), + partitionOwnershipHistory + }; + return false; + } + + // All partitions must be claimed. + const allPartitionsClaimed = + aProcessorPartitions.size + bProcessorPartitions.size === partitionIds.length; + + if (!allPartitionsClaimed) { + lastLoopError = { + reason: "All partitions not claimed", + partitionIds, + a: Array.from(aProcessorPartitions), + b: Array.from(bProcessorPartitions), + partitionOwnershipHistory + }; + } + + return allPartitionsClaimed; + } + }); + } catch (err) { + // close processors + await Promise.all([processor1.stop(), processor2.stop()]); + throw err; + } + + loggerForTest(`All partitions have been claimed.`); + allPartitionsClaimed = true; + + try { + // loop for some time to see if thrashing occurs + await loopUntil({ + name: "partitionThrash", + maxTimes: 4, + timeBetweenRunsMs: 1000, + until: async () => thrashAfterSettling + }); + } catch (err) { + // swallow error, check trashAfterSettling for the condition in finally + } finally { + await Promise.all([processor1.stop(), processor2.stop()]); + should.equal( + thrashAfterSettling, + false, + "Detected PartitionOwnership thrashing after load-balancing has settled." + ); + } + }); + + it("should ensure that all the processors maintain a steady-state when all partitions are being processed (GreedyLoadBalancingStrategy)", async function(): Promise< + void + > { + const partitionIds = await producerClient.getPartitionIds(); + const checkpointStore = new InMemoryCheckpointStore(); + const claimedPartitionsMap = {} as { [eventProcessorId: string]: Set }; + + const partitionOwnershipHistory: string[] = []; + + let allPartitionsClaimed = false; + let thrashAfterSettling = false; + const handlers: SubscriptionEventHandlers = { + async processInitialize(context) { + const eventProcessorId: string = (context as any).eventProcessorId; + const partitionId = context.partitionId; + + partitionOwnershipHistory.push(`${eventProcessorId}: init ${partitionId}`); + + loggerForTest(`[${eventProcessorId}] Claimed partition ${partitionId}`); + if (allPartitionsClaimed) { + thrashAfterSettling = true; + return; + } + + const claimedPartitions = claimedPartitionsMap[eventProcessorId] || new Set(); + claimedPartitions.add(partitionId); + claimedPartitionsMap[eventProcessorId] = claimedPartitions; + }, + async processEvents() {}, + async processError() {}, + async processClose(reason, context) { + const eventProcessorId: string = (context as any).eventProcessorId; + const partitionId = context.partitionId; + const claimedPartitions = claimedPartitionsMap[eventProcessorId]; + claimedPartitions.delete(partitionId); + loggerForTest( + `[${(context as any).eventProcessorId}] processClose(${reason}) on partition ${ + context.partitionId + }` + ); + if (reason === CloseReason.OwnershipLost && allPartitionsClaimed) { + loggerForTest( + `[${(context as any).eventProcessorId}] Lost partition ${context.partitionId}` + ); + thrashAfterSettling = true; + } + } + }; + + const eventProcessorOptions: FullEventProcessorOptions = { + maxBatchSize: 1, + maxWaitTimeInSeconds: 5, + loopIntervalInMs: 1000, + inactiveTimeLimitInMs: 3000, + ownerLevel: 0, + // For this test we don't want to actually checkpoint, just test ownership. + startPosition: latestEventPosition, + loadBalancingStrategy: new GreedyLoadBalancingStrategy(60000) }; const processor1 = new EventProcessor( diff --git a/sdk/eventhub/event-hubs/test/loadBalancer.spec.ts b/sdk/eventhub/event-hubs/test/loadBalancer.spec.ts deleted file mode 100644 index 25bcd01a96d5..000000000000 --- a/sdk/eventhub/event-hubs/test/loadBalancer.spec.ts +++ /dev/null @@ -1,310 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -import { - FairPartitionLoadBalancer, - GreedyPartitionLoadBalancer -} from "../src/partitionLoadBalancer"; -import { PartitionOwnership } from "../src/eventProcessor"; - -describe("PartitionLoadBalancer", () => { - describe("GreedyPartitionLoadBalancer", () => { - it("all", () => { - const m = new Map(); - const lb = new GreedyPartitionLoadBalancer(); - - lb.loadBalance("ownerId", m, ["1", "2", "3"]).should.deep.eq(["1", "2", "3"]); - m.should.be.empty; - }); - - it("filtered", () => { - const m = new Map(); - const lb = new GreedyPartitionLoadBalancer(["2"]); - - lb.loadBalance("ownerId", m, ["1", "2", "3"]).should.deep.eq(["2"]); - m.should.be.empty; - }); - - it("claim partitions we already own", () => { - const m = new Map(); - - m.set("1", { - consumerGroup: "", - fullyQualifiedNamespace: "", - eventHubName: "", - // we already own this so we won't - // try to reclaim it. - ownerId: "ownerId", - partitionId: "" - }); - - m.set("2", { - consumerGroup: "", - fullyQualifiedNamespace: "", - eventHubName: "", - // owned by someone else - we'll steal this - // partition - ownerId: "someOtherOwnerId", - partitionId: "" - }); - - const lb = new GreedyPartitionLoadBalancer(["1", "2", "3"]); - - lb.loadBalance("ownerId", m, ["1", "2", "3"]).should.deep.eq(["1", "2", "3"]); - }); - }); - - describe("FairPartitionLoadBalancer", () => { - const lb = new FairPartitionLoadBalancer(1000 * 60); - - it("odd number of partitions per processor", () => { - const allPartitions = ["0", "1", "2"]; - - // at this point 'a' has it's fair share of partitions (there are 3 total) - // and it's okay to have 1 extra. - let partitionsToOwn = lb.loadBalance( - "a", - createOwnershipMap({ - "1": "b", - "2": "a", - "3": "a" - }), - allPartitions - ); - partitionsToOwn.sort(); - partitionsToOwn.should.be.deep.equal( - ["2", "3"], - "we've gotten our fair share, shouldn't claim anything new" - ); - - // now the other side of this is when we're fighting for the ownership of an - // extra partition - partitionsToOwn = lb.loadBalance( - "a", - createOwnershipMap({ - "1": "b", - "2": "a" - }), - allPartitions - ); - partitionsToOwn.sort(); - partitionsToOwn.should.be.deep.equal( - ["0", "2"], - "we had our minimum fair share (1) but there's still one extra (uneven number of partitions per processor) and we should snag it" - ); - }); - - it("even number of partitions per processor", () => { - const allPartitions = ["0", "1", "2", "3"]; - - // at this point 'a' has it's fair share of partitions (there are 4 total) - // so it'll stop claiming additional partitions. - let partitionsToOwn = lb.loadBalance( - "a", - createOwnershipMap({ - "1": "b", - "2": "a", - "3": "a" - }), - allPartitions - ); - partitionsToOwn.sort(); - partitionsToOwn.should.be.deep.equal( - ["2", "3"], - "we've gotten our fair share, shouldn't claim anything new" - ); - - partitionsToOwn = lb.loadBalance( - "a", - createOwnershipMap({ - "0": "b", - "1": "b", - "2": "a", - "3": "a" - }), - allPartitions - ); - partitionsToOwn.sort(); - partitionsToOwn.should.be.deep.equal(["2", "3"], "load is balanced, won't grab any more."); - }); - - // when there are no freely available partitions (partitions that have either expired or are literally unowned) - // we'll need to steal from an existing processor. - // This can happen in a few ways: - // 1. we were simply racing against other processors - // 2. we're coming in later after all partitions have been allocated (ie, scaling out) - // 3. timing issues, death of a processor, etc... - it("stealing", () => { - // something like this could happen if 'a' were just the only processor - // and now we're spinning up 'b' - let partitionsToOwn = lb.loadBalance( - "b", - createOwnershipMap({ - "0": "a", - "1": "a", - "2": "a" - }), - ["0", "1", "2"] - ); - partitionsToOwn.sort(); - // we'll attempt to steal a partition from 'a'. - partitionsToOwn.length.should.equal( - 1, - "stealing with an odd number of partitions per processor" - ); - - // and now the same case as above, but with an even number of partitions per processor. - partitionsToOwn = lb.loadBalance( - "b", - createOwnershipMap({ - "0": "a", - "1": "a", - "2": "a", - "3": "a" - }), - ["0", "1", "2", "3"] - ); - partitionsToOwn.sort(); - // we'll attempt to steal a partition from 'a'. - partitionsToOwn.length.should.equal( - 1, - "stealing with an even number of partitions per processor" - ); - }); - - it("don't steal when you can just wait", () => { - // @chradek's case: let's say we have this partition layout: - // AAAABBBCCD - // - // Before, we'd let 'C' steal from 'A' - we see that we don't have enough - // +1 processors(exact match) and so 'C' attempts to become one. This can - // lead to some unnecessary thrash as 'A' loses partitions to a processor - // that has technically already met it's quota. - // - // Instead, we treat 'A' is a +1-ish specifically for when we ('C') - // are checking if we want to grab more partitions. - // - // This allows 'A' to just naturally decline as _actual_ processors grab - // their minimum required partitions rather than forcing it and possibly - // having a partition have to juggle between partitions as they try to - // meet the minimum. - const partitions = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]; - - const lb = new FairPartitionLoadBalancer(1000 * 60); - - // we'll do 4 consumers - const initialOwnershipMap = createOwnershipMap({ - "0": "a", - "1": "a", - "2": "a", - "3": "a", - - "4": "b", - "5": "b", - "6": "b", - - "7": "c", - "8": "c", - - "9": "d" - }); - - const requestedPartitions = lb.loadBalance("c", initialOwnershipMap, partitions); - requestedPartitions.sort(); - - requestedPartitions.should.deep.equal( - ["7", "8"], - "c will not steal one partition since it sees that, eventually, 'a' will lose its partitions and become a +1 processor on it's own" - ); - }); - - it("avoid thrash", () => { - // this is a case where we shouldn't steal - we have - // the minimum number of partitions and stealing at this - // point will just keep thrashing both processors. - const partitionsToOwn = lb.loadBalance( - "b", - createOwnershipMap({ - "0": "a", - "1": "b", - "2": "a" - }), - ["0", "1", "2"] - ); - - partitionsToOwn.sort(); - partitionsToOwn.should.deep.equal(["1"], "should not re-steal when things are balanced"); - }); - - it("general cases", () => { - const allPartitions = ["0", "1", "2", "3"]; - - // in the presence of no owners we claim a random partition - let partitionsToOwn = lb.loadBalance("a", createOwnershipMap({}), allPartitions); - partitionsToOwn.length.should.be.equal(1, "nothing is owned, claim one"); - - // if there are other owners we should claim up to #partitions/#owners - partitionsToOwn = lb.loadBalance( - "a", - createOwnershipMap({ - "1": "b", - "3": "a" - }), - allPartitions - ); - partitionsToOwn.length.should.be.equal(2, "1 and 1 with another owner, should claim one"); - // better not try to claim 'b's partition when there are unowned partitions - partitionsToOwn.filter((p) => p === "1").length.should.equal(0); - - // 'b' should claim the last unowned partition - partitionsToOwn = lb.loadBalance( - "b", - createOwnershipMap({ - "1": "b", - "2": "a", - "3": "a" - }), - allPartitions - ); - partitionsToOwn.sort(); - partitionsToOwn.should.be.deep.equal(["0", "1"], "b grabbed the last available partition"); - - // we're balanced - processors now only grab the partitions that they own - partitionsToOwn = lb.loadBalance( - "b", - createOwnershipMap({ - "0": "b", - "1": "a", - "2": "b", - "3": "a" - }), - allPartitions - ); - partitionsToOwn.sort(); - partitionsToOwn.should.be.deep.equal( - ["0", "2"], - "balanced: b only grabbed it's already owned partitions" - ); - }); - - function createOwnershipMap( - partitionToOwner: Record - ): Map { - const ownershipMap = new Map(); - - for (const partitionId in partitionToOwner) { - ownershipMap.set(partitionId, { - consumerGroup: "$Default", - eventHubName: "eventhubname1", - fullyQualifiedNamespace: "fqdn", - ownerId: partitionToOwner[partitionId], - partitionId: partitionId, - etag: "etag", - lastModifiedTimeInMs: Date.now() - }); - } - - return ownershipMap; - } - }); -}); diff --git a/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts new file mode 100644 index 000000000000..927376da4d2b --- /dev/null +++ b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts @@ -0,0 +1,603 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { PartitionOwnership } from "../src/eventProcessor"; +import { BalancedLoadBalancingStrategy } from "../src/loadBalancerStrategies/balancedStrategy"; +import { GreedyLoadBalancingStrategy } from "../src/loadBalancerStrategies/greedyStrategy"; +import { UnbalancedLoadBalancingStrategy } from "../src/loadBalancerStrategies/unbalancedStrategy"; + +describe("LoadBalancingStrategy", () => { + function createOwnershipMap( + partitionToOwner: Record + ): Map { + const ownershipMap = new Map(); + + for (const partitionId in partitionToOwner) { + ownershipMap.set(partitionId, { + consumerGroup: "$Default", + eventHubName: "eventhubname1", + fullyQualifiedNamespace: "fqdn", + ownerId: partitionToOwner[partitionId], + partitionId: partitionId, + etag: "etag", + lastModifiedTimeInMs: Date.now() + }); + } + + return ownershipMap; + } + + describe("UnbalancedLoadBalancingStrategy", () => { + it("all", () => { + const m = new Map(); + const lb = new UnbalancedLoadBalancingStrategy(); + + lb.getPartitionsToCliam("ownerId", m, ["1", "2", "3"]).should.deep.eq(["1", "2", "3"]); + m.should.be.empty; + }); + + it("claim partitions we already own", () => { + const m = new Map(); + + m.set("1", { + consumerGroup: "", + fullyQualifiedNamespace: "", + eventHubName: "", + // we already own this so we won't + // try to reclaim it. + ownerId: "ownerId", + partitionId: "" + }); + + m.set("2", { + consumerGroup: "", + fullyQualifiedNamespace: "", + eventHubName: "", + // owned by someone else - we'll steal this + // partition + ownerId: "someOtherOwnerId", + partitionId: "" + }); + + const lb = new UnbalancedLoadBalancingStrategy(); + + lb.getPartitionsToCliam("ownerId", m, ["1", "2", "3"]).should.deep.eq(["1", "2", "3"]); + }); + }); + + describe("BalancedLoadBalancingStrategy", () => { + const lb = new BalancedLoadBalancingStrategy(1000 * 60); + + it("odd number of partitions per processor", () => { + const allPartitions = ["0", "1", "2"]; + + // at this point 'a' has it's fair share of partitions (there are 3 total) + // and it's okay to have 1 extra. + let partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "1": "b", + "2": "a", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal( + [], + "we've gotten our fair share, shouldn't claim anything new" + ); + + // now the other side of this is when we're fighting for the ownership of an + // extra partition + partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "1": "b", + "2": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal( + ["0"], + "we had our minimum fair share (1) but there's still one extra (uneven number of partitions per processor) and we should snag it" + ); + }); + + it("even number of partitions per processor", () => { + const allPartitions = ["0", "1", "2", "3"]; + + // at this point 'a' has it's fair share of partitions (there are 4 total) + // so it'll stop claiming additional partitions. + let partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "1": "b", + "2": "a", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal( + [], + "we've gotten our fair share, shouldn't claim anything new" + ); + + partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "0": "b", + "1": "b", + "2": "a", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal([], "load is balanced, won't grab any more."); + }); + + // when there are no freely available partitions (partitions that have either expired or are literally unowned) + // we'll need to steal from an existing processor. + // This can happen in a few ways: + // 1. we were simply racing against other processors + // 2. we're coming in later after all partitions have been allocated (ie, scaling out) + // 3. timing issues, death of a processor, etc... + it("stealing", () => { + // something like this could happen if 'a' were just the only processor + // and now we're spinning up 'b' + let partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "0": "a", + "1": "a", + "2": "a" + }), + ["0", "1", "2"] + ); + partitionsToOwn.sort(); + // we'll attempt to steal a partition from 'a'. + partitionsToOwn.length.should.equal( + 1, + "stealing with an odd number of partitions per processor" + ); + + // and now the same case as above, but with an even number of partitions per processor. + partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "0": "a", + "1": "a", + "2": "a", + "3": "a" + }), + ["0", "1", "2", "3"] + ); + partitionsToOwn.sort(); + // we'll attempt to steal a partition from 'a'. + partitionsToOwn.length.should.equal( + 1, + "stealing with an even number of partitions per processor" + ); + }); + + it("don't steal when you can just wait", () => { + // @chradek's case: let's say we have this partition layout: + // AAAABBBCCD + // + // Before, we'd let 'C' steal from 'A' - we see that we don't have enough + // +1 processors(exact match) and so 'C' attempts to become one. This can + // lead to some unnecessary thrash as 'A' loses partitions to a processor + // that has technically already met it's quota. + // + // Instead, we treat 'A' is a +1-ish specifically for when we ('C') + // are checking if we want to grab more partitions. + // + // This allows 'A' to just naturally decline as _actual_ processors grab + // their minimum required partitions rather than forcing it and possibly + // having a partition have to juggle between partitions as they try to + // meet the minimum. + const partitions = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]; + + const lb = new BalancedLoadBalancingStrategy(1000 * 60); + + // we'll do 4 consumers + const initialOwnershipMap = createOwnershipMap({ + "0": "a", + "1": "a", + "2": "a", + "3": "a", + + "4": "b", + "5": "b", + "6": "b", + + "7": "c", + "8": "c", + + "9": "d" + }); + + const requestedPartitions = lb.getPartitionsToCliam("c", initialOwnershipMap, partitions); + requestedPartitions.sort(); + + requestedPartitions.should.deep.equal( + [], + "c will not steal one partition since it sees that, eventually, 'a' will lose its partitions and become a +1 processor on it's own" + ); + }); + + it("avoid thrash", () => { + // this is a case where we shouldn't steal - we have + // the minimum number of partitions and stealing at this + // point will just keep thrashing both processors. + const partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "0": "a", + "1": "b", + "2": "a" + }), + ["0", "1", "2"] + ); + + partitionsToOwn.sort(); + partitionsToOwn.should.deep.equal([], "should not re-steal when things are balanced"); + }); + + it("general cases", () => { + const allPartitions = ["0", "1", "2", "3"]; + + // in the presence of no owners we claim a random partition + let partitionsToOwn = lb.getPartitionsToCliam("a", createOwnershipMap({}), allPartitions); + partitionsToOwn.length.should.be.equal(1, "nothing is owned, claim one"); + + // if there are other owners we should claim up to #partitions/#owners + partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "1": "b", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.length.should.be.equal(1, "1 and 1 with another owner, should claim one"); + // better not try to claim 'b's partition when there are unowned partitions + partitionsToOwn.filter((p) => p === "1").length.should.equal(0); + + // 'b' should claim the last unowned partition + partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "1": "b", + "2": "a", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal(["0"], "b grabbed the last available partition"); + + // we're balanced - processors now only grab the partitions that they own + partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "0": "b", + "1": "a", + "2": "b", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal([], "balanced: b should not grab anymore partitions"); + }); + + it("honors the partitionOwnershipExpirationIntervalInMs", () => { + const intervalInMs = 1000; + const lb = new BalancedLoadBalancingStrategy(intervalInMs); + const allPartitions = ["0", "1"]; + const ownershipMap = createOwnershipMap({ + "0": "b", + "1": "a" + }); + + // At this point, 'a' has its fair share of partitions, and none should be returned. + let partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + partitionsToOwn.length.should.equal(0, "Expected to not claim any new partitions."); + + // Change the ownership of partition "0" so it is older than the interval. + const ownership = ownershipMap.get("0")!; + ownership.lastModifiedTimeInMs = Date.now() - (intervalInMs + 1); // Add 1 to the interval to ensure it has just expired. + + partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + partitionsToOwn.should.deep.equal(["0"]); + }); + }); + + describe("GreedyLoadBalancingStrategy", () => { + const lb = new GreedyLoadBalancingStrategy(1000 * 60); + + it("odd number of partitions per processor", () => { + const allPartitions = ["0", "1", "2"]; + + // at this point 'a' has it's fair share of partitions (there are 3 total) + // and it's okay to have 1 extra. + let partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "1": "b", + "2": "a", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal( + [], + "we've gotten our fair share, shouldn't claim anything new" + ); + + // now the other side of this is when we're fighting for the ownership of an + // extra partition + partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "1": "b", + "2": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal( + ["0"], + "we had our minimum fair share (1) but there's still one extra (uneven number of partitions per processor) and we should snag it" + ); + }); + + it("even number of partitions per processor", () => { + const allPartitions = ["0", "1", "2", "3"]; + + // at this point 'a' has it's fair share of partitions (there are 4 total) + // so it'll stop claiming additional partitions. + let partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "1": "b", + "2": "a", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal( + [], + "we've gotten our fair share, shouldn't claim anything new" + ); + + partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "0": "b", + "1": "b", + "2": "a", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal([], "load is balanced, won't grab any more."); + }); + + // when there are no freely available partitions (partitions that have either expired or are literally unowned) + // we'll need to steal from an existing processor. + // This can happen in a few ways: + // 1. we were simply racing against other processors + // 2. we're coming in later after all partitions have been allocated (ie, scaling out) + // 3. timing issues, death of a processor, etc... + it("stealing", () => { + // something like this could happen if 'a' were just the only processor + // and now we're spinning up 'b' + let partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "0": "a", + "1": "a", + "2": "a" + }), + ["0", "1", "2"] + ); + partitionsToOwn.sort(); + // we'll attempt to steal a partition from 'a'. + partitionsToOwn.length.should.equal( + 1, + "stealing with an odd number of partitions per processor" + ); + + // and now the same case as above, but with an even number of partitions per processor. + partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "0": "a", + "1": "a", + "2": "a", + "3": "a" + }), + ["0", "1", "2", "3"] + ); + partitionsToOwn.sort(); + // we'll attempt to steal a partition from 'a'. + partitionsToOwn.length.should.equal( + 2, + "stealing with an even number of partitions per processor" + ); + }); + + it("claims unowned then steals", () => { + const allPartitions = []; + for (let i = 0; i < 8; i++) { + allPartitions.push(`${i}`); + } + + let partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "0": "", + // skip 1, 2 + "3": "b", + "4": "b", + "5": "b", + "6": "b", + "7": "b" + }), + allPartitions + ); + partitionsToOwn.sort(); + // "a" should have 4 partitions in order to be balanced. + // Partitions "0", "1", "2" should be chosen before any are stolen. + partitionsToOwn.length.should.equal(4, "should have claimed half of the partitions."); + partitionsToOwn + .slice(0, 3) + .should.deep.equal(["0", "1", "2"], "should have claimed unclaimed partitions first."); + }); + + it("don't steal when you can just wait", () => { + // @chradek's case: let's say we have this partition layout: + // AAAABBBCCD + // + // Before, we'd let 'C' steal from 'A' - we see that we don't have enough + // +1 processors(exact match) and so 'C' attempts to become one. This can + // lead to some unnecessary thrash as 'A' loses partitions to a processor + // that has technically already met it's quota. + // + // Instead, we treat 'A' is a +1-ish specifically for when we ('C') + // are checking if we want to grab more partitions. + // + // This allows 'A' to just naturally decline as _actual_ processors grab + // their minimum required partitions rather than forcing it and possibly + // having a partition have to juggle between partitions as they try to + // meet the minimum. + const partitions = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]; + + const lb = new BalancedLoadBalancingStrategy(1000 * 60); + + // we'll do 4 consumers + const initialOwnershipMap = createOwnershipMap({ + "0": "a", + "1": "a", + "2": "a", + "3": "a", + + "4": "b", + "5": "b", + "6": "b", + + "7": "c", + "8": "c", + + "9": "d" + }); + + const requestedPartitions = lb.getPartitionsToCliam("c", initialOwnershipMap, partitions); + requestedPartitions.sort(); + + requestedPartitions.should.deep.equal( + [], + "c will not steal one partition since it sees that, eventually, 'a' will lose its partitions and become a +1 processor on it's own" + ); + }); + + it("avoid thrash", () => { + // this is a case where we shouldn't steal - we have + // the minimum number of partitions and stealing at this + // point will just keep thrashing both processors. + const partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "0": "a", + "1": "b", + "2": "a" + }), + ["0", "1", "2"] + ); + + partitionsToOwn.sort(); + partitionsToOwn.should.deep.equal([], "should not re-steal when things are balanced"); + }); + + it("general cases", () => { + const allPartitions = ["0", "1", "2", "3"]; + + // in the presence of no owners we claim a random partition + let partitionsToOwn = lb.getPartitionsToCliam("a", createOwnershipMap({}), allPartitions); + partitionsToOwn.length.should.be.equal(4, "nothing is owned, claim all"); + + // if there are other owners we should claim up to #partitions/#owners + partitionsToOwn = lb.getPartitionsToCliam( + "a", + createOwnershipMap({ + "1": "b", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.length.should.be.equal(1, "1 and 1 with another owner, should claim one"); + // better not try to claim 'b's partition when there are unowned partitions + partitionsToOwn.filter((p) => p === "1").length.should.equal(0); + + // 'b' should claim the last unowned partition + partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "1": "b", + "2": "a", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal(["0"], "b grabbed the last available partition"); + + // we're balanced - processors now only grab the partitions that they own + partitionsToOwn = lb.getPartitionsToCliam( + "b", + createOwnershipMap({ + "0": "b", + "1": "a", + "2": "b", + "3": "a" + }), + allPartitions + ); + partitionsToOwn.sort(); + partitionsToOwn.should.be.deep.equal([], "balanced: b should not grab anymore partitions"); + }); + + it("honors the partitionOwnershipExpirationIntervalInMs", () => { + const intervalInMs = 1000; + const lb = new GreedyLoadBalancingStrategy(intervalInMs); + const allPartitions = ["0", "1", "2", "3"]; + const ownershipMap = createOwnershipMap({ + "0": "b", + "1": "a" + }); + + // At this point, "a" should only grab 1 partition since both "a" and "b" should end up with 2 partitions each. + let partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + partitionsToOwn.length.should.equal(1, "Expected to claim 1 new partitions."); + + // Change the ownership of partition "0" so it is older than the interval. + const ownership = ownershipMap.get("0")!; + ownership.lastModifiedTimeInMs = Date.now() - (intervalInMs + 1); // Add 1 to the interval to ensure it has just expired. + + // At this point, "a" should grab partitions 0, 2, and 3. + // This is because "b" only owned 1 partition and that claim is expired, + // so "a" as treated as if it is the only owner. + partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + partitionsToOwn.sort(); + partitionsToOwn.should.deep.equal(["0", "2", "3"]); + }); + }); +});