Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[event-hubs] adds loadBalancingOptions and greedy load balancing strategy #9706

Merged
merged 31 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
faad867
[event-hubs] adds EventHubConsumerClientOptions
chradek Jun 19, 2020
daa5920
fix conflicts from rebase
chradek Jun 23, 2020
6b1e029
refactor load balancer to get all claimable partitions
chradek Jun 23, 2020
13cb669
add balanced and greedy load balancer strategies
chradek Jun 23, 2020
5a0a56e
add unbalancedLoadBalancingStrategy
chradek Jun 23, 2020
aed4006
update pumpManager to expose receivingFromPartitions
chradek Jun 23, 2020
c3656e2
updates EventProcessor to use LoadBalancingStrategies
chradek Jun 24, 2020
f2205b8
add tests for the partitionOwnershipExpirationIntervalInMs
chradek Jun 24, 2020
ed1ff3b
add functional load balancing tests
chradek Jun 25, 2020
793d95d
update docs
chradek Jun 25, 2020
d5f15e5
update version to 5.3.0-preview.1
chradek Jun 25, 2020
961795e
add changelog
chradek Jun 25, 2020
41202b5
update pnpm-lock
chradek Jun 25, 2020
dc03b06
address feedback
chradek Jun 25, 2020
f67b005
add explicity existance check to partitionOwnership.lastModifiedTimeInMs
chradek Jun 29, 2020
82828f1
trashing -> thrashing
chradek Jun 29, 2020
9a97ab4
be smarter about else-if statements
chradek Jun 29, 2020
0a6a354
explain the magic number 6
chradek Jun 29, 2020
ceb7c9a
identifyPartitionsToClaim -> getPartitionsToClaim
chradek Jun 29, 2020
62add8a
identifyClaimablePartitions -> listAvailablePartitions
chradek Jun 29, 2020
a313b66
add better summary for EventHubConsumerClientOptions
chradek Jun 29, 2020
5bb4827
add better docs around greedy and balanced strategies
chradek Jun 29, 2020
2420c2b
remove superfluous doc from CommonEventProcessorOptions
chradek Jun 29, 2020
f203204
add comment around why we have abandoned partitions
chradek Jun 29, 2020
e1a9fdb
throw AbortError instead of silent return
chradek Jun 29, 2020
12c3cd6
remove unneeded receivingFrom method
chradek Jun 29, 2020
7d84e15
Merge remote-tracking branch 'upstream/master' into eh-load-balance-v2
chradek Jun 29, 2020
c2c31a7
update pnpm-lock.yaml
chradek Jun 29, 2020
ea14b6e
Merge remote-tracking branch 'upstream/master' into eh-load-balance-v2
chradek Jul 1, 2020
db43e37
rush update
chradek Jul 1, 2020
52e3e0d
Merge remote-tracking branch 'upstream/master' into eh-load-balance-v2
chradek Jul 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 5.3.0-preview.1 (Unreleased)

- Adds `loadBalancingOptions` to the `EventHubConsumerClient` to add control around
how aggressively the client claims partitions while load balancing.
([PR 9706](https://github.com/Azure/azure-sdk-for-js/pull/9706))

## 5.2.2 (2020-06-30)

- Fixes issue [#9289](https://github.com/Azure/azure-sdk-for-js/issues/9289)
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/event-hubs",
"sdk-type": "client",
"version": "5.2.2",
"version": "5.3.0-preview.1",
"description": "Azure Event Hubs SDK for JS.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
24 changes: 18 additions & 6 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ export interface EventHubClientOptions {

// @public
export class EventHubConsumerClient {
constructor(consumerGroup: string, connectionString: string, options?: EventHubClientOptions);
constructor(consumerGroup: string, connectionString: string, checkpointStore: CheckpointStore, options?: EventHubClientOptions);
constructor(consumerGroup: string, connectionString: string, eventHubName: string, options?: EventHubClientOptions);
constructor(consumerGroup: string, connectionString: string, eventHubName: string, checkpointStore: CheckpointStore, options?: EventHubClientOptions);
constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, options?: EventHubClientOptions);
constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, checkpointStore: CheckpointStore, options?: EventHubClientOptions);
constructor(consumerGroup: string, connectionString: string, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, connectionString: string, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, connectionString: string, eventHubName: string, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, connectionString: string, eventHubName: string, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions);
close(): Promise<void>;
static defaultConsumerGroupName: string;
get eventHubName(): string;
Expand All @@ -98,6 +98,11 @@ export class EventHubConsumerClient {
subscribe(partitionId: string, handlers: SubscriptionEventHandlers, options?: SubscribeOptions): Subscription;
}

// @public
export interface EventHubConsumerClientOptions extends EventHubClientOptions {
loadBalancingOptions?: LoadBalancingOptions;
}

// @public
export class EventHubProducerClient {
constructor(connectionString: string, options?: EventHubClientOptions);
Expand Down Expand Up @@ -152,6 +157,13 @@ export interface LastEnqueuedEventProperties {
// @public
export const latestEventPosition: EventPosition;

// @public
export interface LoadBalancingOptions {
partitionOwnershipExpirationIntervalInMs?: number;
strategy?: "balanced" | "greedy";
updateIntervalInMs?: number;
}

// @public
export const logger: import("@azure/logger").AzureLogger;

Expand Down
91 changes: 69 additions & 22 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

import { ConnectionContext, createConnectionContext } from "./connectionContext";
import {
EventHubClientOptions,
EventHubConsumerClientOptions,
GetEventHubPropertiesOptions,
GetPartitionIdsOptions,
GetPartitionPropertiesOptions
GetPartitionPropertiesOptions,
LoadBalancingOptions
} from "./models/public";
import { InMemoryCheckpointStore } from "./inMemoryCheckpointStore";
import { CheckpointStore, EventProcessor, FullEventProcessorOptions } from "./eventProcessor";
import { GreedyPartitionLoadBalancer } from "./partitionLoadBalancer";
import { Constants, TokenCredential } from "@azure/core-amqp";
import { logger } from "./log";

Expand All @@ -24,6 +24,10 @@ import { EventHubProperties, PartitionProperties } from "./managementClient";
import { PartitionGate } from "./impl/partitionGate";
import { v4 as uuid } from "uuid";
import { validateEventPositions } from "./eventPosition";
import { LoadBalancingStrategy } from "./loadBalancerStrategies/loadBalancingStrategy";
import { UnbalancedLoadBalancingStrategy } from "./loadBalancerStrategies/unbalancedStrategy";
import { GreedyLoadBalancingStrategy } from "./loadBalancerStrategies/greedyStrategy";
import { BalancedLoadBalancingStrategy } from "./loadBalancerStrategies/balancedStrategy";

const defaultConsumerClientOptions: Required<Pick<
FullEventProcessorOptions,
Expand Down Expand Up @@ -58,7 +62,7 @@ export class EventHubConsumerClient {
/**
* The options passed by the user when creating the EventHubClient instance.
*/
private _clientOptions: EventHubClientOptions;
private _clientOptions: EventHubConsumerClientOptions;
private _partitionGate = new PartitionGate();
private _id = uuid();

Expand All @@ -78,6 +82,11 @@ export class EventHubConsumerClient {
private _checkpointStore: CheckpointStore;
private _userChoseCheckpointStore: boolean;

/**
* Options for configuring load balancing.
*/
private readonly _loadBalancingOptions: Required<LoadBalancingOptions>;

/**
* @property
* @readonly
Expand Down Expand Up @@ -111,7 +120,11 @@ export class EventHubConsumerClient {
* - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets.
* - `userAgent` : A string to append to the built in user agent string that is passed to the service.
*/
constructor(consumerGroup: string, connectionString: string, options?: EventHubClientOptions); // #1
constructor(
consumerGroup: string,
connectionString: string,
options?: EventHubConsumerClientOptions
); // #1
/**
* @constructor
* The `EventHubConsumerClient` class is used to consume events from an Event Hub.
Expand All @@ -133,7 +146,7 @@ export class EventHubConsumerClient {
consumerGroup: string,
connectionString: string,
checkpointStore: CheckpointStore,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #1.1
/**
* @constructor
Expand All @@ -154,7 +167,7 @@ export class EventHubConsumerClient {
consumerGroup: string,
connectionString: string,
eventHubName: string,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #2
/**
* @constructor
Expand All @@ -179,7 +192,7 @@ export class EventHubConsumerClient {
connectionString: string,
eventHubName: string,
checkpointStore: CheckpointStore,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #2.1
/**
* @constructor
Expand All @@ -202,7 +215,7 @@ export class EventHubConsumerClient {
fullyQualifiedNamespace: string,
eventHubName: string,
credential: TokenCredential,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #3
/**
* @constructor
Expand All @@ -229,18 +242,21 @@ export class EventHubConsumerClient {
eventHubName: string,
credential: TokenCredential,
checkpointStore: CheckpointStore,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #3.1
constructor(
private _consumerGroup: string,
connectionStringOrFullyQualifiedNamespace2: string,
checkpointStoreOrEventHubNameOrOptions3?: CheckpointStore | EventHubClientOptions | string,
checkpointStoreOrEventHubNameOrOptions3?:
| CheckpointStore
| EventHubConsumerClientOptions
| string,
checkpointStoreOrCredentialOrOptions4?:
| CheckpointStore
| EventHubClientOptions
| EventHubConsumerClientOptions
| TokenCredential,
checkpointStoreOrOptions5?: CheckpointStore | EventHubClientOptions,
options6?: EventHubClientOptions
checkpointStoreOrOptions5?: CheckpointStore | EventHubConsumerClientOptions,
options6?: EventHubConsumerClientOptions
) {
if (isTokenCredential(checkpointStoreOrCredentialOrOptions4)) {
// #3 or 3.1
Expand Down Expand Up @@ -271,7 +287,7 @@ export class EventHubConsumerClient {
// 2.1
this._checkpointStore = checkpointStoreOrCredentialOrOptions4;
this._userChoseCheckpointStore = true;
this._clientOptions = (checkpointStoreOrOptions5 as EventHubClientOptions) || {};
this._clientOptions = (checkpointStoreOrOptions5 as EventHubConsumerClientOptions) || {};
} else {
// 2
this._checkpointStore = new InMemoryCheckpointStore();
Expand All @@ -293,20 +309,28 @@ export class EventHubConsumerClient {
this._checkpointStore = checkpointStoreOrEventHubNameOrOptions3;
this._userChoseCheckpointStore = true;
this._clientOptions =
(checkpointStoreOrCredentialOrOptions4 as EventHubClientOptions) || {};
(checkpointStoreOrCredentialOrOptions4 as EventHubConsumerClientOptions) || {};
} else {
// 1
this._checkpointStore = new InMemoryCheckpointStore();
this._userChoseCheckpointStore = false;
this._clientOptions =
(checkpointStoreOrEventHubNameOrOptions3 as EventHubClientOptions) || {};
(checkpointStoreOrEventHubNameOrOptions3 as EventHubConsumerClientOptions) || {};
}

this._context = createConnectionContext(
connectionStringOrFullyQualifiedNamespace2,
this._clientOptions
);
}
this._loadBalancingOptions = {
// default options
strategy: "balanced",
updateIntervalInMs: 10000,
partitionOwnershipExpirationIntervalInMs: 60000,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 6 * updateIntervalInMs better than 60000 if a user passes in only updateIntervalInMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. In the API review, we specifically did not want to default the partitionOwnershipExpirationIntervalIsMs by some multiple of the updateIntervalInMs

// options supplied by user
...this._clientOptions?.loadBalancingOptions
};
}

/**
Expand Down Expand Up @@ -466,6 +490,27 @@ export class EventHubConsumerClient {
return subscription;
}

/**
* Gets the LoadBalancing strategy that should be used based on what the user provided.
*/
private _getLoadBalancingStrategy(): LoadBalancingStrategy {
if (!this._userChoseCheckpointStore) {
// The default behavior when a checkpointstore isn't provided
// is to always grab all the partitions.
return new UnbalancedLoadBalancingStrategy();
}

const partitionOwnershipExpirationIntervalInMs = this._loadBalancingOptions
.partitionOwnershipExpirationIntervalInMs;
if (this._loadBalancingOptions?.strategy === "greedy") {
return new GreedyLoadBalancingStrategy(partitionOwnershipExpirationIntervalInMs);
}

// The default behavior when a checkpointstore is provided is
// to grab one partition at a time.
return new BalancedLoadBalancingStrategy(partitionOwnershipExpirationIntervalInMs);
}

private createEventProcessorForAllPartitions(
subscriptionEventHandlers: SubscriptionEventHandlers,
options?: SubscribeOptions
Expand All @@ -480,6 +525,7 @@ export class EventHubConsumerClient {
logger.verbose("EventHubConsumerClient subscribing to all partitions, no checkpoint store.");
}

const loadBalancingStrategy = this._getLoadBalancingStrategy();
const eventProcessor = this._createEventProcessor(
this._context,
subscriptionEventHandlers,
Expand All @@ -488,13 +534,12 @@ export class EventHubConsumerClient {
...defaultConsumerClientOptions,
...(options as SubscribeOptions),
ownerLevel: getOwnerLevel(options, this._userChoseCheckpointStore),
processingTarget: this._userChoseCheckpointStore
? undefined
: new GreedyPartitionLoadBalancer(),
// make it so all the event processors process work with the same overarching owner ID
// this allows the EventHubConsumer to unify all the work for any processors that it spawns
ownerId: this._id,
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
loadBalancingStrategy,
loopIntervalInMs: this._loadBalancingOptions.updateIntervalInMs
}
);

Expand Down Expand Up @@ -529,7 +574,9 @@ export class EventHubConsumerClient {
...options,
processingTarget: partitionId,
ownerLevel: getOwnerLevel(subscribeOptions, this._userChoseCheckpointStore),
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
loadBalancingStrategy: new UnbalancedLoadBalancingStrategy(),
loopIntervalInMs: this._loadBalancingOptions.updateIntervalInMs ?? 10000
}
);

Expand Down
Loading