Skip to content
This repository has been archived by the owner on Apr 8, 2023. It is now read-only.

Commit

Permalink
[Event Hub] Update ConsumerClient, Processor, PumpManager & Pump to n…
Browse files Browse the repository at this point in the history
…ot use EventHubClient (Azure#9600)
  • Loading branch information
ramya-rao-a authored and sadasant committed Jun 27, 2020
1 parent babd011 commit b25f53d
Show file tree
Hide file tree
Showing 16 changed files with 698 additions and 1,299 deletions.
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class EventHubConsumerClient {
get eventHubName(): string;
get fullyQualifiedNamespace(): string;
getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise<EventHubProperties>;
getPartitionIds(options?: GetPartitionIdsOptions): Promise<string[]>;
getPartitionIds(options?: GetPartitionIdsOptions): Promise<Array<string>>;
getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise<PartitionProperties>;
subscribe(handlers: SubscriptionEventHandlers, options?: SubscribeOptions): Subscription;
subscribe(partitionId: string, handlers: SubscriptionEventHandlers, options?: SubscribeOptions): Subscription;
Expand All @@ -112,7 +112,7 @@ export class EventHubProducerClient {
getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise<PartitionProperties>;
sendBatch(batch: EventData[], options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>;
}
}

// @public
export interface EventHubProperties {
Expand Down
77 changes: 76 additions & 1 deletion sdk/eventhub/event-hubs/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import {
CreateConnectionContextBaseParameters,
EventHubConnectionConfig,
SharedKeyCredential,
TokenCredential
TokenCredential,
isTokenCredential,
parseConnectionString,
EventHubConnectionStringModel,
ConnectionConfig
} from "@azure/core-amqp";
import { ManagementClient, ManagementClientOptions } from "./managementClient";
import { EventHubClientOptions } from "./models/public";
Expand Down Expand Up @@ -426,3 +430,74 @@ export namespace ConnectionContext {
return connectionContext;
}
}

/**
* Helper method to create a ConnectionContext from the input passed to either
* EventHubProducerClient or EventHubConsumerClient constructors
*
* @ignore
* @internal
*/
export function createConnectionContext(
hostOrConnectionString: string,
eventHubNameOrOptions?: string | EventHubClientOptions,
credentialOrOptions?: TokenCredential | EventHubClientOptions,
options?: EventHubClientOptions
): ConnectionContext {
let connectionString;
let config;
let credential: TokenCredential | SharedKeyCredential;
hostOrConnectionString = String(hostOrConnectionString);

if (!isTokenCredential(credentialOrOptions)) {
const parsedCS = parseConnectionString<EventHubConnectionStringModel>(hostOrConnectionString);
if (
!(parsedCS.EntityPath || (typeof eventHubNameOrOptions === "string" && eventHubNameOrOptions))
) {
throw new TypeError(
`Either provide "eventHubName" or the "connectionString": "${hostOrConnectionString}", ` +
`must contain "EntityPath=<your-event-hub-name>".`
);
}
if (
parsedCS.EntityPath &&
typeof eventHubNameOrOptions === "string" &&
eventHubNameOrOptions &&
parsedCS.EntityPath !== eventHubNameOrOptions
) {
throw new TypeError(
`The entity path "${parsedCS.EntityPath}" in connectionString: "${hostOrConnectionString}" ` +
`doesn't match with eventHubName: "${eventHubNameOrOptions}".`
);
}
connectionString = hostOrConnectionString;
if (typeof eventHubNameOrOptions !== "string") {
// connectionstring and/or options were passed to constructor
config = EventHubConnectionConfig.create(connectionString);
options = eventHubNameOrOptions;
} else {
// connectionstring, eventHubName and/or options were passed to constructor
const eventHubName = eventHubNameOrOptions;
config = EventHubConnectionConfig.create(connectionString, eventHubName);
options = credentialOrOptions;
}
// Since connectionstring was passed, create a SharedKeyCredential
credential = new SharedKeyCredential(config.sharedAccessKeyName, config.sharedAccessKey);
} else {
// host, eventHubName, a TokenCredential and/or options were passed to constructor
const eventHubName = eventHubNameOrOptions;
let host = hostOrConnectionString;
credential = credentialOrOptions;
if (!eventHubName) {
throw new TypeError(`"eventHubName" is missing`);
}

if (!host.endsWith("/")) host += "/";
connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubName}`;
config = EventHubConnectionConfig.create(connectionString);
}

ConnectionConfig.validate(config);

return ConnectionContext.create(config, credential, options);
}
99 changes: 54 additions & 45 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { EventHubClient } from "./impl/eventHubClient";
import { ConnectionContext, createConnectionContext } from "./connectionContext";
import {
EventHubClientOptions,
GetEventHubPropertiesOptions,
Expand Down Expand Up @@ -51,7 +51,14 @@ const defaultConsumerClientOptions: Required<Pick<
* to load balance multiple instances of your application.
*/
export class EventHubConsumerClient {
private _eventHubClient: EventHubClient;
/**
* Describes the amqp connection context for the client.
*/
private _context: ConnectionContext;
/**
* The options passed by the user when creating the EventHubClient instance.
*/
private _clientOptions: EventHubClientOptions;
private _partitionGate = new PartitionGate();
private _id = uuid();

Expand All @@ -77,7 +84,7 @@ export class EventHubConsumerClient {
* The name of the Event Hub instance for which this client is created.
*/
get eventHubName(): string {
return this._eventHubClient.eventHubName;
return this._context.config.entityPath;
}

/**
Expand All @@ -87,7 +94,7 @@ export class EventHubConsumerClient {
* This is likely to be similar to <yournamespace>.servicebus.windows.net.
*/
get fullyQualifiedNamespace(): string {
return this._eventHubClient.fullyQualifiedNamespace;
return this._context.config.host;
}

/**
Expand Down Expand Up @@ -239,73 +246,65 @@ export class EventHubConsumerClient {
// #3 or 3.1
logger.info("Creating EventHubConsumerClient with TokenCredential.");

let eventHubClientOptions: EventHubClientOptions | undefined;

if (isCheckpointStore(checkpointStoreOrOptions5)) {
// 3.1
this._checkpointStore = checkpointStoreOrOptions5;
this._userChoseCheckpointStore = true;
eventHubClientOptions = options6;
this._clientOptions = options6 || {};
} else {
this._checkpointStore = new InMemoryCheckpointStore();
this._userChoseCheckpointStore = false;
eventHubClientOptions = checkpointStoreOrOptions5;
this._clientOptions = checkpointStoreOrOptions5 || {};
}

this._eventHubClient = new EventHubClient(
this._context = createConnectionContext(
connectionStringOrFullyQualifiedNamespace2,
checkpointStoreOrEventHubNameOrOptions3 as string,
checkpointStoreOrCredentialOrOptions4,
eventHubClientOptions
this._clientOptions
);
} else if (typeof checkpointStoreOrEventHubNameOrOptions3 === "string") {
// #2 or 2.1
logger.info("Creating EventHubConsumerClient with connection string and event hub name.");

let eventHubClientOptions: EventHubClientOptions | undefined;

if (isCheckpointStore(checkpointStoreOrCredentialOrOptions4)) {
// 2.1
this._checkpointStore = checkpointStoreOrCredentialOrOptions4;
this._userChoseCheckpointStore = true;
eventHubClientOptions = checkpointStoreOrOptions5 as EventHubClientOptions | undefined;
this._clientOptions = (checkpointStoreOrOptions5 as EventHubClientOptions) || {};
} else {
// 2
this._checkpointStore = new InMemoryCheckpointStore();
this._userChoseCheckpointStore = false;
eventHubClientOptions = checkpointStoreOrCredentialOrOptions4;
this._clientOptions = checkpointStoreOrCredentialOrOptions4 || {};
}

this._eventHubClient = new EventHubClient(
this._context = createConnectionContext(
connectionStringOrFullyQualifiedNamespace2,
checkpointStoreOrEventHubNameOrOptions3,
eventHubClientOptions as EventHubClientOptions
this._clientOptions
);
} else {
// #1 or 1.1
logger.info("Creating EventHubConsumerClient with connection string.");

let eventHubClientOptions: EventHubClientOptions | undefined;

if (isCheckpointStore(checkpointStoreOrEventHubNameOrOptions3)) {
// 1.1
this._checkpointStore = checkpointStoreOrEventHubNameOrOptions3;
this._userChoseCheckpointStore = true;
eventHubClientOptions = checkpointStoreOrCredentialOrOptions4 as
| EventHubClientOptions
| undefined;
this._clientOptions =
(checkpointStoreOrCredentialOrOptions4 as EventHubClientOptions) || {};
} else {
// 1
this._checkpointStore = new InMemoryCheckpointStore();
this._userChoseCheckpointStore = false;
eventHubClientOptions = checkpointStoreOrEventHubNameOrOptions3 as
| EventHubClientOptions
| undefined;
this._clientOptions =
(checkpointStoreOrEventHubNameOrOptions3 as EventHubClientOptions) || {};
}

this._eventHubClient = new EventHubClient(
this._context = createConnectionContext(
connectionStringOrFullyQualifiedNamespace2,
eventHubClientOptions
this._clientOptions
);
}
}
Expand All @@ -324,8 +323,8 @@ export class EventHubConsumerClient {
return subscription.close();
})
);
// Close the connection via the client.
return this._eventHubClient.close();
// Close the connection via the connection context.
return this._context.close();
}

/**
Expand All @@ -336,8 +335,15 @@ export class EventHubConsumerClient {
* @throws Error if the underlying connection has been closed, create a new EventHubConsumerClient.
* @throws AbortError if the operation is cancelled via the abortSignal.
*/
getPartitionIds(options: GetPartitionIdsOptions = {}): Promise<string[]> {
return this._eventHubClient.getPartitionIds(options);
getPartitionIds(options: GetPartitionIdsOptions = {}): Promise<Array<string>> {
return this._context
.managementSession!.getEventHubProperties({
...options,
retryOptions: this._clientOptions.retryOptions
})
.then((eventHubProperties) => {
return eventHubProperties.partitionIds;
});
}

/**
Expand All @@ -352,7 +358,10 @@ export class EventHubConsumerClient {
partitionId: string,
options: GetPartitionPropertiesOptions = {}
): Promise<PartitionProperties> {
return this._eventHubClient.getPartitionProperties(partitionId, options);
return this._context.managementSession!.getPartitionProperties(partitionId, {
...options,
retryOptions: this._clientOptions.retryOptions
});
}

/**
Expand All @@ -363,7 +372,10 @@ export class EventHubConsumerClient {
* @throws AbortError if the operation is cancelled via the abortSignal.
*/
getEventHubProperties(options: GetEventHubPropertiesOptions = {}): Promise<EventHubProperties> {
return this._eventHubClient.getProperties(options);
return this._context.managementSession!.getEventHubProperties({
...options,
retryOptions: this._clientOptions.retryOptions
});
}

/**
Expand Down Expand Up @@ -421,9 +433,7 @@ export class EventHubConsumerClient {
handlersOrPartitionId1,
options
));
} else if (
isSubscriptionEventHandlers(optionsOrHandlers2)
) {
} else if (isSubscriptionEventHandlers(optionsOrHandlers2)) {
// #2: subscribe overload (read from specific partition IDs), don't coordinate
const options = possibleOptions3 as SubscribeOptions | undefined;
if (options && options.startPosition) {
Expand Down Expand Up @@ -471,8 +481,7 @@ export class EventHubConsumerClient {
}

const eventProcessor = this._createEventProcessor(
this._consumerGroup,
this._eventHubClient,
this._context,
subscriptionEventHandlers,
this._checkpointStore,
{
Expand All @@ -484,7 +493,8 @@ export class EventHubConsumerClient {
: new GreedyPartitionLoadBalancer(),
// make it so all the event processors process work with the same overarching owner ID
// this allows the EventHubConsumer to unify all the work for any processors that it spawns
ownerId: this._id
ownerId: this._id,
retryOptions: this._clientOptions.retryOptions
}
);

Expand All @@ -511,31 +521,30 @@ export class EventHubConsumerClient {
}

const eventProcessor = this._createEventProcessor(
this._consumerGroup,
this._eventHubClient,
this._context,
eventHandlers,
this._checkpointStore,
{
...defaultConsumerClientOptions,
...options,
processingTarget: partitionId,
ownerLevel: getOwnerLevel(subscribeOptions, this._userChoseCheckpointStore)
ownerLevel: getOwnerLevel(subscribeOptions, this._userChoseCheckpointStore),
retryOptions: this._clientOptions.retryOptions
}
);

return { targetedPartitionId: partitionId, eventProcessor };
}

private _createEventProcessor(
consumerGroup: string,
eventHubClient: EventHubClient,
connectionContext: ConnectionContext,
subscriptionEventHandlers: SubscriptionEventHandlers,
checkpointStore: CheckpointStore,
options: FullEventProcessorOptions
) {
return new EventProcessor(
consumerGroup,
eventHubClient,
this._consumerGroup,
connectionContext,
subscriptionEventHandlers,
checkpointStore,
options
Expand Down
3 changes: 1 addition & 2 deletions sdk/eventhub/event-hubs/src/eventHubProducerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
import { isTokenCredential, TokenCredential } from "@azure/core-amqp";
import { getTracer } from "@azure/core-tracing";
import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { ConnectionContext } from "./connectionContext";
import { ConnectionContext, createConnectionContext } from "./connectionContext";
import { instrumentEventData, TRACEPARENT_PROPERTY } from "./diagnostics/instrumentEventData";
import { createMessageSpan } from "./diagnostics/messageSpan";
import { EventData } from "./eventData";
import { EventDataBatch, EventDataBatchImpl, isEventDataBatch } from "./eventDataBatch";
import { EventHubSender } from './eventHubSender';
import { createConnectionContext } from "./impl/eventHubClient";
import { logErrorStackTrace, logger } from './log';
import { EventHubProperties, PartitionProperties } from "./managementClient";
import {
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from "rhea-promise";
import { Constants, MessagingError, delay, translate } from "@azure/core-amqp";
import { EventDataInternal, ReceivedEventData, fromAmqpMessage } from "./eventData";
import { EventHubConsumerOptions } from "./impl/eventHubClient";
import { EventHubConsumerOptions } from "./models/private";
import { ConnectionContext } from "./connectionContext";
import { LinkEntity } from "./linkEntity";
import { EventPosition, getEventPositionFilter } from "./eventPosition";
Expand Down
Loading

0 comments on commit b25f53d

Please sign in to comment.