Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Remove EventHubClient dependency in ProducerClient #9520

Merged
merged 5 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion sdk/eventhub/event-hubs/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/* eslint-disable @azure/azure-sdk/ts-no-namespaces */
/* eslint-disable no-inner-declarations */

import { logger } from "./log";
import { logger, logErrorStackTrace } from "./log";
import { getRuntimeInfo } from "./util/runtimeInfo";
import { packageJsonInfo } from "./util/constants";
import { EventHubReceiver } from "./eventHubReceiver";
Expand Down Expand Up @@ -61,6 +61,10 @@ export interface ConnectionContext extends ConnectionContextBase {
* is in the process of closing or disconnecting.
*/
readyToOpenLink(): Promise<void>;
/**
* Closes all AMQP links, sessions and connection.
*/
close(): Promise<void>;
}

/**
Expand Down Expand Up @@ -214,6 +218,34 @@ export namespace ConnectionContext {
return waitForDisconnectPromise;
}
return Promise.resolve();
},
async close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this feels like one of those things that should have always been here 😄

try {
if (this.connection.isOpen()) {
// Close all the senders.
for (const senderName of Object.keys(this.senders)) {
await this.senders[senderName].close();
}
// Close all the receivers.
for (const receiverName of Object.keys(this.receivers)) {
await this.receivers[receiverName].close();
}
// Close the cbs session;
await this.cbsSession.close();
// Close the management session
await this.managementSession!.close();
await this.connection.close();
this.wasConnectionCloseCalled = true;
logger.info("Closed the amqp connection '%s' on the client.", this.connectionId);
}
} catch (err) {
err = err instanceof Error ? err : JSON.stringify(err);
logger.warning(
`An error occurred while closing the connection "${this.connectionId}":\n${err}`
);
logErrorStackTrace(err);
throw err;
}
}
});

Expand Down
84 changes: 55 additions & 29 deletions sdk/eventhub/event-hubs/src/eventHubProducerClient.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { TokenCredential, isTokenCredential } from "@azure/core-amqp";
import { isTokenCredential, TokenCredential } from "@azure/core-amqp";
import { ConnectionContext } from "./connectionContext";
import { EventData } from "./eventData";
import { EventDataBatch, isEventDataBatch } from "./eventDataBatch";
import { EventHubClient } from "./impl/eventHubClient";
import { createConnectionContext } from "./impl/eventHubClient";
import { EventHubProperties, PartitionProperties } from "./managementClient";
import { EventHubProducer } from "./sender";
import {
CreateBatchOptions,
EventHubClientOptions,
Expand All @@ -14,7 +15,8 @@ import {
GetPartitionPropertiesOptions,
SendBatchOptions
} from "./models/public";
import { EventData } from "./eventData";
import { EventHubProducer } from "./sender";
import { throwErrorIfConnectionClosed } from "./util/error";
import { OperationOptions } from "./util/operationOptions";

/**
Expand All @@ -30,8 +32,18 @@ import { OperationOptions } from "./util/operationOptions";
*
*/
export class EventHubProducerClient {
private _client: EventHubClient;
/**
* Describes the amqp connection context for the client.
*/
private _context: ConnectionContext;

/**
* The options passed by the user when creating the EventHubClient instance.
*/
private _clientOptions: EventHubClientOptions;
/**
* Map of partitionId to producers
*/
private _producersMap: Map<string, EventHubProducer>;

/**
Expand All @@ -40,7 +52,7 @@ export class EventHubProducerClient {
* The name of the Event Hub instance for which this client is created.
*/
get eventHubName(): string {
return this._client.eventHubName;
return this._context.config.entityPath;
}

/**
Expand All @@ -50,7 +62,7 @@ export class EventHubProducerClient {
* This is likely to be similar to <yournamespace>.servicebus.windows.net.
*/
get fullyQualifiedNamespace(): string {
return this._client.fullyQualifiedNamespace;
return this._context.config.host;
}

/**
Expand Down Expand Up @@ -109,24 +121,18 @@ export class EventHubProducerClient {
credentialOrOptions3?: TokenCredential | EventHubClientOptions,
options4?: EventHubClientOptions
) {
this._context = createConnectionContext(
fullyQualifiedNamespaceOrConnectionString1,
eventHubNameOrOptions2,
credentialOrOptions3,
options4
);
if (typeof eventHubNameOrOptions2 !== "string") {
this._client = new EventHubClient(
fullyQualifiedNamespaceOrConnectionString1,
eventHubNameOrOptions2
);
this._clientOptions = eventHubNameOrOptions2 || {};
} else if (!isTokenCredential(credentialOrOptions3)) {
this._client = new EventHubClient(
fullyQualifiedNamespaceOrConnectionString1,
eventHubNameOrOptions2,
credentialOrOptions3
);
this._clientOptions = credentialOrOptions3 || {};
} else {
this._client = new EventHubClient(
fullyQualifiedNamespaceOrConnectionString1,
eventHubNameOrOptions2,
credentialOrOptions3,
options4
);
this._clientOptions = options4 || {};
}

this._producersMap = new Map();
Expand All @@ -147,14 +153,18 @@ export class EventHubProducerClient {
* @throws AbortError if the operation is cancelled via the abortSignal in the options.
*/
async createBatch(options?: CreateBatchOptions): Promise<EventDataBatch> {
throwErrorIfConnectionClosed(this._context);

if (options && options.partitionId && options.partitionKey) {
throw new Error("partitionId and partitionKey cannot both be set when creating a batch");
}

let producer = this._producersMap.get("");

if (!producer) {
producer = this._client.createProducer();
producer = new EventHubProducer(this._context, {
retryOptions: this._clientOptions.retryOptions
});
this._producersMap.set("", producer);
}

Expand Down Expand Up @@ -195,6 +205,8 @@ export class EventHubProducerClient {
batch: EventDataBatch | EventData[],
options: SendBatchOptions | OperationOptions = {}
): Promise<void> {
throwErrorIfConnectionClosed(this._context);

let partitionId: string | undefined;
let partitionKey: string | undefined;
if (isEventDataBatch(batch)) {
Expand Down Expand Up @@ -231,7 +243,8 @@ export class EventHubProducerClient {

let producer = this._producersMap.get(partitionId);
if (!producer) {
producer = this._client.createProducer({
producer = new EventHubProducer(this._context, {
retryOptions: this._clientOptions.retryOptions,
partitionId: partitionId === "" ? undefined : partitionId
});
this._producersMap.set(partitionId, producer);
Expand All @@ -246,7 +259,7 @@ export class EventHubProducerClient {
* @throws Error if the underlying connection encounters an error while closing.
*/
async close(): Promise<void> {
await this._client.close();
await this._context.close();

for (const pair of this._producersMap) {
await pair[1].close();
Expand All @@ -261,8 +274,13 @@ export class EventHubProducerClient {
* @throws Error if the underlying connection has been closed, create a new EventHubProducerClient.
* @throws AbortError if the operation is cancelled via the abortSignal.
*/
getEventHubProperties(options: GetEventHubPropertiesOptions = {}): Promise<EventHubProperties> {
return this._client.getProperties(options);
getEventHubProperties(
options: GetEventHubPropertiesOptions = {}
): Promise<EventHubProperties> {
return this._context.managementSession!.getEventHubProperties({
...options,
retryOptions: this._clientOptions.retryOptions
});
}

/**
Expand All @@ -274,7 +292,12 @@ export class EventHubProducerClient {
* @throws AbortError if the operation is cancelled via the abortSignal.
*/
getPartitionIds(options: GetPartitionIdsOptions = {}): Promise<Array<string>> {
return this._client.getPartitionIds(options);
return this._context.managementSession!.getEventHubProperties({
...options,
retryOptions: this._clientOptions.retryOptions
}).then(eventHubProperties => {
return eventHubProperties.partitionIds;
});
}

/**
Expand All @@ -289,6 +312,9 @@ export class EventHubProducerClient {
partitionId: string,
options: GetPartitionPropertiesOptions = {}
): Promise<PartitionProperties> {
return this._client.getPartitionProperties(partitionId, options);
return this._context.managementSession!.getPartitionProperties(partitionId, {
...options,
retryOptions: this._clientOptions.retryOptions
});
}
}
Loading