Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hub] Update Processor & Pump to not use EventHubClient #9600

Merged
merged 5 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class EventHubConsumerClient {
get eventHubName(): string;
get fullyQualifiedNamespace(): string;
getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise<EventHubProperties>;
getPartitionIds(options?: GetPartitionIdsOptions): Promise<string[]>;
getPartitionIds(options?: GetPartitionIdsOptions): Promise<Array<string>>;
getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise<PartitionProperties>;
subscribe(handlers: SubscriptionEventHandlers, options?: SubscribeOptions): Subscription;
subscribe(partitionId: string, handlers: SubscriptionEventHandlers, options?: SubscribeOptions): Subscription;
Expand All @@ -112,7 +112,7 @@ export class EventHubProducerClient {
getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise<PartitionProperties>;
sendBatch(batch: EventData[], options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>;
}
}

// @public
export interface EventHubProperties {
Expand Down
77 changes: 76 additions & 1 deletion sdk/eventhub/event-hubs/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import {
CreateConnectionContextBaseParameters,
EventHubConnectionConfig,
SharedKeyCredential,
TokenCredential
TokenCredential,
isTokenCredential,
parseConnectionString,
EventHubConnectionStringModel,
ConnectionConfig
} from "@azure/core-amqp";
import { ManagementClient, ManagementClientOptions } from "./managementClient";
import { EventHubClientOptions } from "./models/public";
Expand Down Expand Up @@ -426,3 +430,74 @@ export namespace ConnectionContext {
return connectionContext;
}
}

/**
* Helper method to create a ConnectionContext from the input passed to either
* EventHubProducerClient or EventHubConsumerClient constructors
*
* @ignore
* @internal
*/
export function createConnectionContext(
hostOrConnectionString: string,
eventHubNameOrOptions?: string | EventHubClientOptions,
credentialOrOptions?: TokenCredential | EventHubClientOptions,
options?: EventHubClientOptions
): ConnectionContext {
let connectionString;
let config;
let credential: TokenCredential | SharedKeyCredential;
hostOrConnectionString = String(hostOrConnectionString);

if (!isTokenCredential(credentialOrOptions)) {
const parsedCS = parseConnectionString<EventHubConnectionStringModel>(hostOrConnectionString);
if (
!(parsedCS.EntityPath || (typeof eventHubNameOrOptions === "string" && eventHubNameOrOptions))
) {
throw new TypeError(
`Either provide "eventHubName" or the "connectionString": "${hostOrConnectionString}", ` +
`must contain "EntityPath=<your-event-hub-name>".`
);
}
if (
parsedCS.EntityPath &&
typeof eventHubNameOrOptions === "string" &&
eventHubNameOrOptions &&
parsedCS.EntityPath !== eventHubNameOrOptions
) {
throw new TypeError(
`The entity path "${parsedCS.EntityPath}" in connectionString: "${hostOrConnectionString}" ` +
`doesn't match with eventHubName: "${eventHubNameOrOptions}".`
);
}
connectionString = hostOrConnectionString;
if (typeof eventHubNameOrOptions !== "string") {
// connectionstring and/or options were passed to constructor
config = EventHubConnectionConfig.create(connectionString);
options = eventHubNameOrOptions;
} else {
// connectionstring, eventHubName and/or options were passed to constructor
const eventHubName = eventHubNameOrOptions;
config = EventHubConnectionConfig.create(connectionString, eventHubName);
options = credentialOrOptions;
}
// Since connectionstring was passed, create a SharedKeyCredential
credential = new SharedKeyCredential(config.sharedAccessKeyName, config.sharedAccessKey);
} else {
// host, eventHubName, a TokenCredential and/or options were passed to constructor
const eventHubName = eventHubNameOrOptions;
let host = hostOrConnectionString;
credential = credentialOrOptions;
if (!eventHubName) {
throw new TypeError(`"eventHubName" is missing`);
}

if (!host.endsWith("/")) host += "/";
connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubName}`;
config = EventHubConnectionConfig.create(connectionString);
}

ConnectionConfig.validate(config);

return ConnectionContext.create(config, credential, options);
}
99 changes: 54 additions & 45 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { EventHubClient } from "./impl/eventHubClient";
import { ConnectionContext, createConnectionContext } from "./connectionContext";
import {
EventHubClientOptions,
GetEventHubPropertiesOptions,
Expand Down Expand Up @@ -51,7 +51,14 @@ const defaultConsumerClientOptions: Required<Pick<
* to load balance multiple instances of your application.
*/
export class EventHubConsumerClient {
private _eventHubClient: EventHubClient;
/**
* Describes the amqp connection context for the client.
*/
private _context: ConnectionContext;
/**
* The options passed by the user when creating the EventHubClient instance.
*/
private _clientOptions: EventHubClientOptions;
private _partitionGate = new PartitionGate();
private _id = uuid();

Expand All @@ -77,7 +84,7 @@ export class EventHubConsumerClient {
* The name of the Event Hub instance for which this client is created.
*/
get eventHubName(): string {
return this._eventHubClient.eventHubName;
return this._context.config.entityPath;
}

/**
Expand All @@ -87,7 +94,7 @@ export class EventHubConsumerClient {
* This is likely to be similar to <yournamespace>.servicebus.windows.net.
*/
get fullyQualifiedNamespace(): string {
return this._eventHubClient.fullyQualifiedNamespace;
return this._context.config.host;
}

/**
Expand Down Expand Up @@ -239,73 +246,65 @@ export class EventHubConsumerClient {
// #3 or 3.1
logger.info("Creating EventHubConsumerClient with TokenCredential.");

let eventHubClientOptions: EventHubClientOptions | undefined;

if (isCheckpointStore(checkpointStoreOrOptions5)) {
// 3.1
this._checkpointStore = checkpointStoreOrOptions5;
this._userChoseCheckpointStore = true;
eventHubClientOptions = options6;
this._clientOptions = options6 || {};
} else {
this._checkpointStore = new InMemoryCheckpointStore();
this._userChoseCheckpointStore = false;
eventHubClientOptions = checkpointStoreOrOptions5;
this._clientOptions = checkpointStoreOrOptions5 || {};
}

this._eventHubClient = new EventHubClient(
this._context = createConnectionContext(
connectionStringOrFullyQualifiedNamespace2,
checkpointStoreOrEventHubNameOrOptions3 as string,
checkpointStoreOrCredentialOrOptions4,
eventHubClientOptions
this._clientOptions
);
} else if (typeof checkpointStoreOrEventHubNameOrOptions3 === "string") {
// #2 or 2.1
logger.info("Creating EventHubConsumerClient with connection string and event hub name.");

let eventHubClientOptions: EventHubClientOptions | undefined;

if (isCheckpointStore(checkpointStoreOrCredentialOrOptions4)) {
// 2.1
this._checkpointStore = checkpointStoreOrCredentialOrOptions4;
this._userChoseCheckpointStore = true;
eventHubClientOptions = checkpointStoreOrOptions5 as EventHubClientOptions | undefined;
this._clientOptions = (checkpointStoreOrOptions5 as EventHubClientOptions) || {};
} else {
// 2
this._checkpointStore = new InMemoryCheckpointStore();
this._userChoseCheckpointStore = false;
eventHubClientOptions = checkpointStoreOrCredentialOrOptions4;
this._clientOptions = checkpointStoreOrCredentialOrOptions4 || {};
}

this._eventHubClient = new EventHubClient(
this._context = createConnectionContext(
connectionStringOrFullyQualifiedNamespace2,
checkpointStoreOrEventHubNameOrOptions3,
eventHubClientOptions as EventHubClientOptions
this._clientOptions
);
} else {
// #1 or 1.1
logger.info("Creating EventHubConsumerClient with connection string.");

let eventHubClientOptions: EventHubClientOptions | undefined;

if (isCheckpointStore(checkpointStoreOrEventHubNameOrOptions3)) {
// 1.1
this._checkpointStore = checkpointStoreOrEventHubNameOrOptions3;
this._userChoseCheckpointStore = true;
eventHubClientOptions = checkpointStoreOrCredentialOrOptions4 as
| EventHubClientOptions
| undefined;
this._clientOptions =
(checkpointStoreOrCredentialOrOptions4 as EventHubClientOptions) || {};
} else {
// 1
this._checkpointStore = new InMemoryCheckpointStore();
this._userChoseCheckpointStore = false;
eventHubClientOptions = checkpointStoreOrEventHubNameOrOptions3 as
| EventHubClientOptions
| undefined;
this._clientOptions =
(checkpointStoreOrEventHubNameOrOptions3 as EventHubClientOptions) || {};
}

this._eventHubClient = new EventHubClient(
this._context = createConnectionContext(
connectionStringOrFullyQualifiedNamespace2,
eventHubClientOptions
this._clientOptions
);
}
}
Expand All @@ -324,8 +323,8 @@ export class EventHubConsumerClient {
return subscription.close();
})
);
// Close the connection via the client.
return this._eventHubClient.close();
// Close the connection via the connection context.
return this._context.close();
}

/**
Expand All @@ -336,8 +335,15 @@ export class EventHubConsumerClient {
* @throws Error if the underlying connection has been closed, create a new EventHubConsumerClient.
* @throws AbortError if the operation is cancelled via the abortSignal.
*/
getPartitionIds(options: GetPartitionIdsOptions = {}): Promise<string[]> {
return this._eventHubClient.getPartitionIds(options);
getPartitionIds(options: GetPartitionIdsOptions = {}): Promise<Array<string>> {
return this._context
.managementSession!.getEventHubProperties({
...options,
retryOptions: this._clientOptions.retryOptions
})
.then((eventHubProperties) => {
return eventHubProperties.partitionIds;
});
}

/**
Expand All @@ -352,7 +358,10 @@ export class EventHubConsumerClient {
partitionId: string,
options: GetPartitionPropertiesOptions = {}
): Promise<PartitionProperties> {
return this._eventHubClient.getPartitionProperties(partitionId, options);
return this._context.managementSession!.getPartitionProperties(partitionId, {
...options,
retryOptions: this._clientOptions.retryOptions
});
}

/**
Expand All @@ -363,7 +372,10 @@ export class EventHubConsumerClient {
* @throws AbortError if the operation is cancelled via the abortSignal.
*/
getEventHubProperties(options: GetEventHubPropertiesOptions = {}): Promise<EventHubProperties> {
return this._eventHubClient.getProperties(options);
return this._context.managementSession!.getEventHubProperties({
...options,
retryOptions: this._clientOptions.retryOptions
});
}

/**
Expand Down Expand Up @@ -421,9 +433,7 @@ export class EventHubConsumerClient {
handlersOrPartitionId1,
options
));
} else if (
isSubscriptionEventHandlers(optionsOrHandlers2)
) {
} else if (isSubscriptionEventHandlers(optionsOrHandlers2)) {
// #2: subscribe overload (read from specific partition IDs), don't coordinate
const options = possibleOptions3 as SubscribeOptions | undefined;
if (options && options.startPosition) {
Expand Down Expand Up @@ -471,8 +481,7 @@ export class EventHubConsumerClient {
}

const eventProcessor = this._createEventProcessor(
this._consumerGroup,
this._eventHubClient,
this._context,
subscriptionEventHandlers,
this._checkpointStore,
{
Expand All @@ -484,7 +493,8 @@ export class EventHubConsumerClient {
: new GreedyPartitionLoadBalancer(),
// make it so all the event processors process work with the same overarching owner ID
// this allows the EventHubConsumer to unify all the work for any processors that it spawns
ownerId: this._id
ownerId: this._id,
retryOptions: this._clientOptions.retryOptions
}
);

Expand All @@ -511,31 +521,30 @@ export class EventHubConsumerClient {
}

const eventProcessor = this._createEventProcessor(
this._consumerGroup,
this._eventHubClient,
this._context,
eventHandlers,
this._checkpointStore,
{
...defaultConsumerClientOptions,
...options,
processingTarget: partitionId,
ownerLevel: getOwnerLevel(subscribeOptions, this._userChoseCheckpointStore)
ownerLevel: getOwnerLevel(subscribeOptions, this._userChoseCheckpointStore),
retryOptions: this._clientOptions.retryOptions
}
);

return { targetedPartitionId: partitionId, eventProcessor };
}

private _createEventProcessor(
consumerGroup: string,
eventHubClient: EventHubClient,
connectionContext: ConnectionContext,
subscriptionEventHandlers: SubscriptionEventHandlers,
checkpointStore: CheckpointStore,
options: FullEventProcessorOptions
) {
return new EventProcessor(
consumerGroup,
eventHubClient,
this._consumerGroup,
connectionContext,
subscriptionEventHandlers,
checkpointStore,
options
Expand Down
3 changes: 1 addition & 2 deletions sdk/eventhub/event-hubs/src/eventHubProducerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
import { isTokenCredential, TokenCredential } from "@azure/core-amqp";
import { getTracer } from "@azure/core-tracing";
import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { ConnectionContext } from "./connectionContext";
import { ConnectionContext, createConnectionContext } from "./connectionContext";
import { instrumentEventData, TRACEPARENT_PROPERTY } from "./diagnostics/instrumentEventData";
import { createMessageSpan } from "./diagnostics/messageSpan";
import { EventData } from "./eventData";
import { EventDataBatch, EventDataBatchImpl, isEventDataBatch } from "./eventDataBatch";
import { EventHubSender } from './eventHubSender';
import { createConnectionContext } from "./impl/eventHubClient";
import { logErrorStackTrace, logger } from './log';
import { EventHubProperties, PartitionProperties } from "./managementClient";
import {
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from "rhea-promise";
import { Constants, MessagingError, delay, translate } from "@azure/core-amqp";
import { EventDataInternal, ReceivedEventData, fromAmqpMessage } from "./eventData";
import { EventHubConsumerOptions } from "./impl/eventHubClient";
import { EventHubConsumerOptions } from "./models/private";
import { ConnectionContext } from "./connectionContext";
import { LinkEntity } from "./linkEntity";
import { EventPosition, getEventPositionFilter } from "./eventPosition";
Expand Down
Loading