From a63505596a31e0624f94f51a36d977bb7d13307f Mon Sep 17 00:00:00 2001 From: Ramya Achutha Rao Date: Fri, 12 Jun 2020 19:16:18 -0700 Subject: [PATCH 1/5] [Event Hubs] Remove EventHubClient dependency in ProducerClient --- .../event-hubs/src/connectionContext.ts | 34 +- .../event-hubs/src/eventHubProducerClient.ts | 115 +++++-- .../event-hubs/src/impl/eventHubClient.ts | 310 +++++------------- .../event-hubs/src/managementClient.ts | 199 ++++++----- sdk/eventhub/event-hubs/test/client.spec.ts | 4 +- .../event-hubs/test/hubruntime.spec.ts | 18 +- .../event-hubs/test/node/disconnects.spec.ts | 6 +- 7 files changed, 338 insertions(+), 348 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index 6462361d8b8c..f6c61591dc0d 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -4,7 +4,7 @@ /* eslint-disable @azure/azure-sdk/ts-no-namespaces */ /* eslint-disable no-inner-declarations */ -import { logger } from "./log"; +import { logger, logErrorStackTrace } from "./log"; import { getRuntimeInfo } from "./util/runtimeInfo"; import { packageJsonInfo } from "./util/constants"; import { EventHubReceiver } from "./eventHubReceiver"; @@ -61,6 +61,10 @@ export interface ConnectionContext extends ConnectionContextBase { * is in the process of closing or disconnecting. */ readyToOpenLink(): Promise; + /** + * Closes all AMQP links, sessions and connection. + */ + close(): Promise; } /** @@ -214,6 +218,34 @@ export namespace ConnectionContext { return waitForDisconnectPromise; } return Promise.resolve(); + }, + async close() { + try { + if (this.connection.isOpen()) { + // Close all the senders. + for (const senderName of Object.keys(this.senders)) { + await this.senders[senderName].close(); + } + // Close all the receivers. + for (const receiverName of Object.keys(this.receivers)) { + await this.receivers[receiverName].close(); + } + // Close the cbs session; + await this.cbsSession.close(); + // Close the management session + await this.managementSession!.close(); + await this.connection.close(); + this.wasConnectionCloseCalled = true; + logger.info("Closed the amqp connection '%s' on the client.", this.connectionId); + } + } catch (err) { + err = err instanceof Error ? err : JSON.stringify(err); + logger.warning( + `An error occurred while closing the connection "${this.connectionId}":\n${err}` + ); + logErrorStackTrace(err); + throw err; + } } }); diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index 699a1ce2c4d6..64a337364d29 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -1,11 +1,13 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { TokenCredential, isTokenCredential } from "@azure/core-amqp"; +import { isTokenCredential, TokenCredential } from "@azure/core-amqp"; +import { ConnectionContext } from "./connectionContext"; +import { EventData } from "./eventData"; import { EventDataBatch, isEventDataBatch } from "./eventDataBatch"; -import { EventHubClient } from "./impl/eventHubClient"; +import { createConnectionContext } from "./impl/eventHubClient"; import { EventHubProperties, PartitionProperties } from "./managementClient"; -import { EventHubProducer } from "./sender"; +import { EventHubProducerOptions } from "./models/private"; import { CreateBatchOptions, EventHubClientOptions, @@ -14,7 +16,8 @@ import { GetPartitionPropertiesOptions, SendBatchOptions } from "./models/public"; -import { EventData } from "./eventData"; +import { EventHubProducer } from "./sender"; +import { throwErrorIfConnectionClosed } from "./util/error"; import { OperationOptions } from "./util/operationOptions"; /** @@ -30,8 +33,18 @@ import { OperationOptions } from "./util/operationOptions"; * */ export class EventHubProducerClient { - private _client: EventHubClient; + /** + * Describes the amqp connection context for the client. + */ + private _context: ConnectionContext; + /** + * The options passed by the user when creating the EventHubClient instance. + */ + private _clientOptions: EventHubClientOptions; + /** + * Map of partitionId to producers + */ private _producersMap: Map; /** @@ -40,7 +53,7 @@ export class EventHubProducerClient { * The name of the Event Hub instance for which this client is created. */ get eventHubName(): string { - return this._client.eventHubName; + return this._context.config.entityPath; } /** @@ -50,7 +63,7 @@ export class EventHubProducerClient { * This is likely to be similar to .servicebus.windows.net. */ get fullyQualifiedNamespace(): string { - return this._client.fullyQualifiedNamespace; + return this._context.config.host; } /** @@ -109,24 +122,18 @@ export class EventHubProducerClient { credentialOrOptions3?: TokenCredential | EventHubClientOptions, options4?: EventHubClientOptions ) { + this._context = createConnectionContext( + fullyQualifiedNamespaceOrConnectionString1, + eventHubNameOrOptions2, + credentialOrOptions3, + options4 + ); if (typeof eventHubNameOrOptions2 !== "string") { - this._client = new EventHubClient( - fullyQualifiedNamespaceOrConnectionString1, - eventHubNameOrOptions2 - ); + this._clientOptions = eventHubNameOrOptions2 || {}; } else if (!isTokenCredential(credentialOrOptions3)) { - this._client = new EventHubClient( - fullyQualifiedNamespaceOrConnectionString1, - eventHubNameOrOptions2, - credentialOrOptions3 - ); + this._clientOptions = credentialOrOptions3 || {}; } else { - this._client = new EventHubClient( - fullyQualifiedNamespaceOrConnectionString1, - eventHubNameOrOptions2, - credentialOrOptions3, - options4 - ); + this._clientOptions = options4 || {}; } this._producersMap = new Map(); @@ -154,7 +161,7 @@ export class EventHubProducerClient { let producer = this._producersMap.get(""); if (!producer) { - producer = this._client.createProducer(); + producer = this._createProducer(); this._producersMap.set("", producer); } @@ -231,7 +238,7 @@ export class EventHubProducerClient { let producer = this._producersMap.get(partitionId); if (!producer) { - producer = this._client.createProducer({ + producer = this._createProducer({ partitionId: partitionId === "" ? undefined : partitionId }); this._producersMap.set(partitionId, producer); @@ -246,7 +253,7 @@ export class EventHubProducerClient { * @throws Error if the underlying connection encounters an error while closing. */ async close(): Promise { - await this._client.close(); + await this._context.close(); for (const pair of this._producersMap) { await pair[1].close(); @@ -261,8 +268,13 @@ export class EventHubProducerClient { * @throws Error if the underlying connection has been closed, create a new EventHubProducerClient. * @throws AbortError if the operation is cancelled via the abortSignal. */ - getEventHubProperties(options: GetEventHubPropertiesOptions = {}): Promise { - return this._client.getProperties(options); + async getEventHubProperties( + options: GetEventHubPropertiesOptions = {} + ): Promise { + return this._context.managementSession!.getEventHubProperties({ + ...options, + retryOptions: this._clientOptions.retryOptions + }); } /** @@ -273,8 +285,12 @@ export class EventHubProducerClient { * @throws Error if the underlying connection has been closed, create a new EventHubProducerClient. * @throws AbortError if the operation is cancelled via the abortSignal. */ - getPartitionIds(options: GetPartitionIdsOptions = {}): Promise> { - return this._client.getPartitionIds(options); + async getPartitionIds(options: GetPartitionIdsOptions = {}): Promise> { + const eventHubProperties = await this._context.managementSession!.getEventHubProperties({ + ...options, + retryOptions: this._clientOptions.retryOptions + }); + return eventHubProperties.partitionIds; } /** @@ -285,10 +301,47 @@ export class EventHubProducerClient { * @throws Error if the underlying connection has been closed, create a new EventHubProducerClient. * @throws AbortError if the operation is cancelled via the abortSignal. */ - getPartitionProperties( + async getPartitionProperties( partitionId: string, options: GetPartitionPropertiesOptions = {} ): Promise { - return this._client.getPartitionProperties(partitionId, options); + return this._context.managementSession!.getPartitionProperties(partitionId, { + ...options, + retryOptions: this._clientOptions.retryOptions + }); + } + + /** + * Creates an Event Hub producer that can send events to the Event Hub. + * If `partitionId` is specified in the `options`, all event data sent using the producer + * will be sent to the specified partition. + * Otherwise, they are automatically routed to an available partition by the Event Hubs service. + * + * Automatic routing of partitions is recommended because: + * - The sending of events will be highly available. + * - The event data will be evenly distributed among all available partitions. + * + * @param options The set of options to apply when creating the producer. + * - `partitionId` : The identifier of the partition that the producer can be bound to. + * - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events. + * A simple usage can be `{ "maxRetries": 4 }`. + * + * @throws Error if the underlying connection has been closed, create a new EventHubClient. + * @returns EventHubProducer + */ + private _createProducer(options?: EventHubProducerOptions): EventHubProducer { + if (!options) { + options = {}; + } + if (!options.retryOptions) { + options.retryOptions = this._clientOptions.retryOptions; + } + throwErrorIfConnectionClosed(this._context); + return new EventHubProducer( + this.eventHubName, + this.fullyQualifiedNamespace, + this._context, + options + ); } } diff --git a/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts b/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts index e10c791010ff..7c1c6d427941 100644 --- a/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts +++ b/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { logErrorStackTrace, logger } from "../log"; import { ConnectionConfig, EventHubConnectionConfig, @@ -16,13 +15,8 @@ import { import { ConnectionContext } from "../connectionContext"; import { EventHubProperties, PartitionProperties } from "../managementClient"; import { EventPosition } from "../eventPosition"; -import { EventHubProducer } from "../sender"; import { EventHubConsumer } from "../receiver"; import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "../util/error"; -import { getTracer } from "@azure/core-tracing"; -import { CanonicalCode, Span, SpanContext, SpanKind } from "@opentelemetry/api"; -import { getParentSpan } from "../util/operationOptions"; -import { EventHubProducerOptions, OperationNames } from "../models/private"; import { EventHubClientOptions, GetEventHubPropertiesOptions, @@ -147,86 +141,20 @@ export class EventHubClient { credentialOrOptions?: TokenCredential | EventHubClientOptions, options?: EventHubClientOptions ) { - let connectionString; - let config; - let credential: TokenCredential | SharedKeyCredential; - hostOrConnectionString = String(hostOrConnectionString); - - if (!isTokenCredential(credentialOrOptions)) { - const parsedCS = parseConnectionString(hostOrConnectionString); - if ( - !( - parsedCS.EntityPath || - (typeof eventHubNameOrOptions === "string" && eventHubNameOrOptions) - ) - ) { - throw new TypeError( - `Either provide "eventHubName" or the "connectionString": "${hostOrConnectionString}", ` + - `must contain "EntityPath=".` - ); - } - if ( - parsedCS.EntityPath && - typeof eventHubNameOrOptions === "string" && - eventHubNameOrOptions && - parsedCS.EntityPath !== eventHubNameOrOptions - ) { - throw new TypeError( - `The entity path "${parsedCS.EntityPath}" in connectionString: "${hostOrConnectionString}" ` + - `doesn't match with eventHubName: "${eventHubNameOrOptions}".` - ); - } - connectionString = hostOrConnectionString; - if (typeof eventHubNameOrOptions !== "string") { - // connectionstring and/or options were passed to constructor - config = EventHubConnectionConfig.create(connectionString); - options = eventHubNameOrOptions; - } else { - // connectionstring, eventHubName and/or options were passed to constructor - const eventHubName = eventHubNameOrOptions; - config = EventHubConnectionConfig.create(connectionString, eventHubName); - options = credentialOrOptions; - } - // Since connectionstring was passed, create a SharedKeyCredential - credential = new SharedKeyCredential(config.sharedAccessKeyName, config.sharedAccessKey); + this._context = createConnectionContext( + hostOrConnectionString, + eventHubNameOrOptions, + credentialOrOptions, + options + ); + this.endpoint = this._context.config.endpoint; + if (typeof eventHubNameOrOptions !== "string") { + this._clientOptions = eventHubNameOrOptions || {}; + } else if (!isTokenCredential(credentialOrOptions)) { + this._clientOptions = credentialOrOptions || {}; } else { - // host, eventHubName, a TokenCredential and/or options were passed to constructor - const eventHubName = eventHubNameOrOptions; - let host = hostOrConnectionString; - credential = credentialOrOptions; - if (!eventHubName) { - throw new TypeError(`"eventHubName" is missing`); - } - - if (!host.endsWith("/")) host += "/"; - connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubName}`; - config = EventHubConnectionConfig.create(connectionString); + this._clientOptions = options || {}; } - - ConnectionConfig.validate(config); - - this.endpoint = config.endpoint; - - this._clientOptions = options || {}; - this._context = ConnectionContext.create(config, credential, this._clientOptions); - } - - private _createClientSpan( - operationName: OperationNames, - parentSpan?: Span | SpanContext | null, - internal: boolean = false - ): Span { - const tracer = getTracer(); - const span = tracer.startSpan(`Azure.EventHubs.${operationName}`, { - kind: internal ? SpanKind.INTERNAL : SpanKind.CLIENT, - parent: parentSpan - }); - - span.setAttribute("az.namespace", "Microsoft.EventHub"); - span.setAttribute("message_bus.destination", this.eventHubName); - span.setAttribute("peer.address", this.endpoint); - - return span; } /** @@ -236,66 +164,7 @@ export class EventHubClient { * @throws Error if the underlying connection encounters an error while closing. */ async close(): Promise { - try { - if (this._context.connection.isOpen()) { - // Close all the senders. - for (const senderName of Object.keys(this._context.senders)) { - await this._context.senders[senderName].close(); - } - // Close all the receivers. - for (const receiverName of Object.keys(this._context.receivers)) { - await this._context.receivers[receiverName].close(); - } - // Close the cbs session; - await this._context.cbsSession.close(); - // Close the management session - await this._context.managementSession!.close(); - await this._context.connection.close(); - this._context.wasConnectionCloseCalled = true; - logger.info("Closed the amqp connection '%s' on the client.", this._context.connectionId); - } - } catch (err) { - err = err instanceof Error ? err : JSON.stringify(err); - logger.warning( - `An error occurred while closing the connection "${this._context.connectionId}":\n${err}` - ); - logErrorStackTrace(err); - throw err; - } - } - - /** - * Creates an Event Hub producer that can send events to the Event Hub. - * If `partitionId` is specified in the `options`, all event data sent using the producer - * will be sent to the specified partition. - * Otherwise, they are automatically routed to an available partition by the Event Hubs service. - * - * Automatic routing of partitions is recommended because: - * - The sending of events will be highly available. - * - The event data will be evenly distributed among all available partitions. - * - * @param options The set of options to apply when creating the producer. - * - `partitionId` : The identifier of the partition that the producer can be bound to. - * - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events. - * A simple usage can be `{ "maxRetries": 4 }`. - * - * @throws Error if the underlying connection has been closed, create a new EventHubClient. - * @returns EventHubProducer - */ - createProducer(options?: EventHubProducerOptions): EventHubProducer { - if (!options) { - options = {}; - } - if (!options.retryOptions) { - options.retryOptions = this._clientOptions.retryOptions; - } - throwErrorIfConnectionClosed(this._context); - return new EventHubProducer( - this.eventHubName, - this.fullyQualifiedNamespace, - this._context, - options - ); + return this._context.close(); } /** @@ -364,29 +233,10 @@ export class EventHubClient { * @throws AbortError if the operation is cancelled via the abortSignal3. */ async getProperties(options: GetEventHubPropertiesOptions = {}): Promise { - throwErrorIfConnectionClosed(this._context); - const clientSpan = this._createClientSpan( - "getEventHubProperties", - getParentSpan(options.tracingOptions) - ); - try { - const result = await this._context.managementSession!.getHubRuntimeInformation({ - retryOptions: this._clientOptions.retryOptions, - abortSignal: options.abortSignal - }); - clientSpan.setStatus({ code: CanonicalCode.OK }); - return result; - } catch (err) { - clientSpan.setStatus({ - code: CanonicalCode.UNKNOWN, - message: err.message - }); - logger.warning("An error occurred while getting the hub runtime information: %O", err); - logErrorStackTrace(err); - throw err; - } finally { - clientSpan.end(); - } + return this._context.managementSession!.getEventHubProperties({ + retryOptions: this._clientOptions.retryOptions, + abortSignal: options.abortSignal + }); } /** @@ -397,34 +247,8 @@ export class EventHubClient { * @throws AbortError if the operation is cancelled via the abortSignal. */ async getPartitionIds(options: GetPartitionIdsOptions): Promise> { - throwErrorIfConnectionClosed(this._context); - const clientSpan = this._createClientSpan( - "getPartitionIds", - getParentSpan(options.tracingOptions), - true - ); - try { - const runtimeInfo = await this.getProperties({ - ...options, - tracingOptions: { - spanOptions: { - parent: clientSpan.context() - } - } - }); - clientSpan.setStatus({ code: CanonicalCode.OK }); - return runtimeInfo.partitionIds; - } catch (err) { - clientSpan.setStatus({ - code: CanonicalCode.UNKNOWN, - message: err.message - }); - logger.warning("An error occurred while getting the partition ids: %O", err); - logErrorStackTrace(err); - throw err; - } finally { - clientSpan.end(); - } + const properties = await this.getProperties(options); + return properties.partitionIds; } /** @@ -439,35 +263,73 @@ export class EventHubClient { partitionId: string, options: GetPartitionPropertiesOptions = {} ): Promise { - throwErrorIfConnectionClosed(this._context); - throwTypeErrorIfParameterMissing( - this._context.connectionId, - "getPartitionProperties", - "partitionId", - partitionId - ); - partitionId = String(partitionId); - const clientSpan = this._createClientSpan( - "getPartitionProperties", - getParentSpan(options.tracingOptions) - ); - try { - const result = await this._context.managementSession!.getPartitionProperties(partitionId, { - retryOptions: this._clientOptions.retryOptions, - abortSignal: options.abortSignal - }); - clientSpan.setStatus({ code: CanonicalCode.OK }); - return result; - } catch (err) { - clientSpan.setStatus({ - code: CanonicalCode.UNKNOWN, - message: err.message - }); - logger.warning("An error occurred while getting the partition information: %O", err); - logErrorStackTrace(err); - throw err; - } finally { - clientSpan.end(); + return this._context.managementSession!.getPartitionProperties(partitionId, { + retryOptions: this._clientOptions.retryOptions, + abortSignal: options.abortSignal + }); + } +} + +export function createConnectionContext( + hostOrConnectionString: string, + eventHubNameOrOptions?: string | EventHubClientOptions, + credentialOrOptions?: TokenCredential | EventHubClientOptions, + options?: EventHubClientOptions +): ConnectionContext { + let connectionString; + let config; + let credential: TokenCredential | SharedKeyCredential; + hostOrConnectionString = String(hostOrConnectionString); + + if (!isTokenCredential(credentialOrOptions)) { + const parsedCS = parseConnectionString(hostOrConnectionString); + if ( + !(parsedCS.EntityPath || (typeof eventHubNameOrOptions === "string" && eventHubNameOrOptions)) + ) { + throw new TypeError( + `Either provide "eventHubName" or the "connectionString": "${hostOrConnectionString}", ` + + `must contain "EntityPath=".` + ); + } + if ( + parsedCS.EntityPath && + typeof eventHubNameOrOptions === "string" && + eventHubNameOrOptions && + parsedCS.EntityPath !== eventHubNameOrOptions + ) { + throw new TypeError( + `The entity path "${parsedCS.EntityPath}" in connectionString: "${hostOrConnectionString}" ` + + `doesn't match with eventHubName: "${eventHubNameOrOptions}".` + ); } + connectionString = hostOrConnectionString; + if (typeof eventHubNameOrOptions !== "string") { + // connectionstring and/or options were passed to constructor + config = EventHubConnectionConfig.create(connectionString); + options = eventHubNameOrOptions; + } else { + // connectionstring, eventHubName and/or options were passed to constructor + const eventHubName = eventHubNameOrOptions; + config = EventHubConnectionConfig.create(connectionString, eventHubName); + options = credentialOrOptions; + } + // Since connectionstring was passed, create a SharedKeyCredential + credential = new SharedKeyCredential(config.sharedAccessKeyName, config.sharedAccessKey); + } else { + // host, eventHubName, a TokenCredential and/or options were passed to constructor + const eventHubName = eventHubNameOrOptions; + let host = hostOrConnectionString; + credential = credentialOrOptions; + if (!eventHubName) { + throw new TypeError(`"eventHubName" is missing`); + } + + if (!host.endsWith("/")) host += "/"; + connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubName}`; + config = EventHubConnectionConfig.create(connectionString); } + + ConnectionConfig.validate(config); + + return ConnectionContext.create(config, credential, options); } diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index bad5a4a7f599..8398402d901d 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -28,6 +28,11 @@ import { LinkEntity } from "./linkEntity"; import { logErrorStackTrace, logger } from "./log"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; import { AbortError, AbortSignalLike } from "@azure/abort-controller"; +import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error"; +import { OperationNames } from "./models/private"; +import { Span, SpanContext, SpanKind, CanonicalCode } from "@opentelemetry/api"; +import { getParentSpan, OperationOptions } from "./util/operationOptions"; +import { getTracer } from "@azure/core-tracing"; /** * Describes the runtime information of an Event Hub. */ @@ -158,48 +163,52 @@ export class ManagementClient extends LinkEntity { * @param connection - The established amqp connection * @returns */ - async getHubRuntimeInformation(options?: { - retryOptions?: RetryOptions; - abortSignal?: AbortSignalLike; - }): Promise { - if (!options) { - options = {}; - } - const securityToken = await this.getSecurityToken(); - const request: Message = { - body: Buffer.from(JSON.stringify([])), - message_id: uuid(), - reply_to: this.replyTo, - application_properties: { - operation: Constants.readOperation, - name: this.entityPath as string, - type: `${Constants.vendorString}:${Constants.eventHub}`, - security_token: securityToken?.token - } - }; - - const info: any = await this._makeManagementRequest(request, { - ...options, - requestName: "getHubRuntimeInformation" - }); - const runtimeInfo: EventHubProperties = { - name: info.name, - createdOn: new Date(info.created_at), - partitionIds: info.partition_ids - }; - logger.verbose("[%s] The hub runtime info is: %O", this._context.connectionId, runtimeInfo); - return runtimeInfo; - } + async getEventHubProperties( + options: OperationOptions & { retryOptions?: RetryOptions } = {} + ): Promise { + throwErrorIfConnectionClosed(this._context); + const clientSpan = this._createClientSpan( + "getEventHubProperties", + getParentSpan(options.tracingOptions) + ); + try { + const securityToken = await this.getSecurityToken(); + const request: Message = { + body: Buffer.from(JSON.stringify([])), + message_id: uuid(), + reply_to: this.replyTo, + application_properties: { + operation: Constants.readOperation, + name: this.entityPath as string, + type: `${Constants.vendorString}:${Constants.eventHub}`, + security_token: securityToken?.token + } + }; - /** - * Provides an array of partitionIds. - * @ignore - * @param connection - The established amqp connection - * @returns - */ - async getPartitionIds(): Promise> { - const runtimeInfo = await this.getHubRuntimeInformation(); - return runtimeInfo.partitionIds; + const info: any = await this._makeManagementRequest(request, { + ...options, + requestName: "getHubRuntimeInformation" + }); + const runtimeInfo: EventHubProperties = { + name: info.name, + createdOn: new Date(info.created_at), + partitionIds: info.partition_ids + }; + logger.verbose("[%s] The hub runtime info is: %O", this._context.connectionId, runtimeInfo); + + clientSpan.setStatus({ code: CanonicalCode.OK }); + return runtimeInfo; + } catch (error) { + clientSpan.setStatus({ + code: CanonicalCode.UNKNOWN, + message: error.message + }); + logger.warning("An error occurred while getting the hub runtime information: %O", error); + logErrorStackTrace(error); + throw error; + } finally { + clientSpan.end(); + } } /** @@ -210,41 +219,67 @@ export class ManagementClient extends LinkEntity { */ async getPartitionProperties( partitionId: string, - options?: { retryOptions?: RetryOptions; abortSignal?: AbortSignalLike } + options: OperationOptions & { retryOptions?: RetryOptions } = {} ): Promise { - if (!options) { - options = {}; - } - const securityToken = await this.getSecurityToken(); - const request: Message = { - body: Buffer.from(JSON.stringify([])), - message_id: uuid(), - reply_to: this.replyTo, - application_properties: { - operation: Constants.readOperation, - name: this.entityPath as string, - type: `${Constants.vendorString}:${Constants.partition}`, - partition: `${partitionId}`, - security_token: securityToken?.token - } - }; + throwErrorIfConnectionClosed(this._context); + throwTypeErrorIfParameterMissing( + this._context.connectionId, + "getPartitionProperties", + "partitionId", + partitionId + ); + partitionId = String(partitionId); + + const clientSpan = this._createClientSpan( + "getPartitionProperties", + getParentSpan(options.tracingOptions) + ); - const info: any = await this._makeManagementRequest(request, { - ...options, - requestName: "getPartitionInformation" - }); + try { + const securityToken = await this.getSecurityToken(); + const request: Message = { + body: Buffer.from(JSON.stringify([])), + message_id: uuid(), + reply_to: this.replyTo, + application_properties: { + operation: Constants.readOperation, + name: this.entityPath as string, + type: `${Constants.vendorString}:${Constants.partition}`, + partition: `${partitionId}`, + security_token: securityToken?.token + } + }; - const partitionInfo: PartitionProperties = { - beginningSequenceNumber: info.begin_sequence_number, - eventHubName: info.name, - lastEnqueuedOffset: info.last_enqueued_offset, - lastEnqueuedOnUtc: new Date(info.last_enqueued_time_utc), - lastEnqueuedSequenceNumber: info.last_enqueued_sequence_number, - partitionId: info.partition, - isEmpty: info.is_partition_empty - }; - logger.verbose("[%s] The partition info is: %O.", this._context.connectionId, partitionInfo); - return partitionInfo; + const info: any = await this._makeManagementRequest(request, { + ...options, + requestName: "getPartitionInformation" + }); + + const partitionInfo: PartitionProperties = { + beginningSequenceNumber: info.begin_sequence_number, + eventHubName: info.name, + lastEnqueuedOffset: info.last_enqueued_offset, + lastEnqueuedOnUtc: new Date(info.last_enqueued_time_utc), + lastEnqueuedSequenceNumber: info.last_enqueued_sequence_number, + partitionId: info.partition, + isEmpty: info.is_partition_empty + }; + logger.verbose("[%s] The partition info is: %O.", this._context.connectionId, partitionInfo); + + clientSpan.setStatus({ code: CanonicalCode.OK }); + + return partitionInfo; + } catch (error) { + clientSpan.setStatus({ + code: CanonicalCode.UNKNOWN, + message: error.message + }); + logger.warning("An error occurred while getting the partition information: %O", error); + logErrorStackTrace(error); + throw error; + } finally { + clientSpan.end(); + } } /** @@ -471,4 +506,22 @@ export class ManagementClient extends LinkEntity { private _isMgmtRequestResponseLinkOpen(): boolean { return this._mgmtReqResLink! && this._mgmtReqResLink!.isOpen(); } + + private _createClientSpan( + operationName: OperationNames, + parentSpan?: Span | SpanContext | null, + internal: boolean = false + ): Span { + const tracer = getTracer(); + const span = tracer.startSpan(`Azure.EventHubs.${operationName}`, { + kind: internal ? SpanKind.INTERNAL : SpanKind.CLIENT, + parent: parentSpan + }); + + span.setAttribute("az.namespace", "Microsoft.EventHub"); + span.setAttribute("message_bus.destination", this._context.config.entityPath); + span.setAttribute("peer.address", this._context.config.endpoint); + + return span; + } } diff --git a/sdk/eventhub/event-hubs/test/client.spec.ts b/sdk/eventhub/event-hubs/test/client.spec.ts index 802652779207..de37e5515afc 100644 --- a/sdk/eventhub/event-hubs/test/client.spec.ts +++ b/sdk/eventhub/event-hubs/test/client.spec.ts @@ -492,7 +492,7 @@ describe("EventHubProducerClient User Agent String", function(): void { env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], env[EnvVarKeys.EVENTHUB_NAME] ); - testUserAgentString(producerClient["_client"]["_context"]); + testUserAgentString(producerClient["_context"]); await producerClient.close(); }); @@ -503,7 +503,7 @@ describe("EventHubProducerClient User Agent String", function(): void { env[EnvVarKeys.EVENTHUB_NAME], { userAgent: customUserAgent } ); - testUserAgentString(producerClient["_client"]["_context"], customUserAgent); + testUserAgentString(producerClient["_context"], customUserAgent); await producerClient.close(); }); }); diff --git a/sdk/eventhub/event-hubs/test/hubruntime.spec.ts b/sdk/eventhub/event-hubs/test/hubruntime.spec.ts index 4735ca38270c..04786443f7b3 100644 --- a/sdk/eventhub/event-hubs/test/hubruntime.spec.ts +++ b/sdk/eventhub/event-hubs/test/hubruntime.spec.ts @@ -116,13 +116,8 @@ describe("RuntimeInformation", function(): void { name: rootSpan.name, children: [ { - name: "Azure.EventHubs.getPartitionIds", - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] + name: "Azure.EventHubs.getEventHubProperties", + children: [] } ] } @@ -158,13 +153,8 @@ describe("RuntimeInformation", function(): void { name: rootSpan.name, children: [ { - name: "Azure.EventHubs.getPartitionIds", - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] + name: "Azure.EventHubs.getEventHubProperties", + children: [] } ] } diff --git a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts index 55f0da2a7be4..b46d69cbea32 100644 --- a/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts +++ b/sdk/eventhub/event-hubs/test/node/disconnects.spec.ts @@ -89,7 +89,7 @@ describe("disconnected", function() { describe("EventHubProducerClient", function() { it("runtimeInfo work after disconnect", async () => { const client = new EventHubProducerClient(service.connectionString, service.path); - const clientConnectionContext = client["_client"]["_context"]; + const clientConnectionContext = client["_context"]; await client.getPartitionIds({}); const originalConnectionId = clientConnectionContext.connectionId; @@ -108,7 +108,7 @@ describe("disconnected", function() { it("should send after a disconnect", async () => { const client = new EventHubProducerClient(service.connectionString, service.path); - const clientConnectionContext = client["_client"]["_context"]; + const clientConnectionContext = client["_context"]; await client.sendBatch([{ body: "test" }]); const originalConnectionId = clientConnectionContext.connectionId; @@ -126,7 +126,7 @@ describe("disconnected", function() { it("should not throw an uncaught exception", async () => { const client = new EventHubProducerClient(service.connectionString, service.path); - const clientConnectionContext = client["_client"]["_context"]; + const clientConnectionContext = client["_context"]; // Send an event to open the connection. await client.sendBatch([{ body: "test" }]); From 1c3c3a1a660ea166933c4f97ae04255a1bdc4a53 Mon Sep 17 00:00:00 2001 From: Ramya Achutha Rao Date: Fri, 12 Jun 2020 21:24:44 -0700 Subject: [PATCH 2/5] Plumb through operation options when using consumer client --- sdk/eventhub/event-hubs/src/impl/eventHubClient.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts b/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts index 7c1c6d427941..176b69c55eaf 100644 --- a/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts +++ b/sdk/eventhub/event-hubs/src/impl/eventHubClient.ts @@ -235,7 +235,7 @@ export class EventHubClient { async getProperties(options: GetEventHubPropertiesOptions = {}): Promise { return this._context.managementSession!.getEventHubProperties({ retryOptions: this._clientOptions.retryOptions, - abortSignal: options.abortSignal + ...options }); } @@ -265,7 +265,7 @@ export class EventHubClient { ): Promise { return this._context.managementSession!.getPartitionProperties(partitionId, { retryOptions: this._clientOptions.retryOptions, - abortSignal: options.abortSignal + ...options }); } } From 6b5100f8ebcd1cdb63f1997c47925d0e4407af50 Mon Sep 17 00:00:00 2001 From: Ramya Achutha Rao Date: Fri, 12 Jun 2020 23:36:02 -0700 Subject: [PATCH 3/5] Create producer directly without helper method --- .../event-hubs/src/eventHubProducerClient.ts | 46 ++++--------------- sdk/eventhub/event-hubs/src/sender.ts | 6 +-- 2 files changed, 11 insertions(+), 41 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index 64a337364d29..aa0765e74487 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -7,7 +7,6 @@ import { EventData } from "./eventData"; import { EventDataBatch, isEventDataBatch } from "./eventDataBatch"; import { createConnectionContext } from "./impl/eventHubClient"; import { EventHubProperties, PartitionProperties } from "./managementClient"; -import { EventHubProducerOptions } from "./models/private"; import { CreateBatchOptions, EventHubClientOptions, @@ -154,6 +153,8 @@ export class EventHubProducerClient { * @throws AbortError if the operation is cancelled via the abortSignal in the options. */ async createBatch(options?: CreateBatchOptions): Promise { + throwErrorIfConnectionClosed(this._context); + if (options && options.partitionId && options.partitionKey) { throw new Error("partitionId and partitionKey cannot both be set when creating a batch"); } @@ -161,7 +162,9 @@ export class EventHubProducerClient { let producer = this._producersMap.get(""); if (!producer) { - producer = this._createProducer(); + producer = new EventHubProducer(this._context, { + retryOptions: this._clientOptions.retryOptions + }); this._producersMap.set("", producer); } @@ -202,6 +205,8 @@ export class EventHubProducerClient { batch: EventDataBatch | EventData[], options: SendBatchOptions | OperationOptions = {} ): Promise { + throwErrorIfConnectionClosed(this._context); + let partitionId: string | undefined; let partitionKey: string | undefined; if (isEventDataBatch(batch)) { @@ -238,7 +243,8 @@ export class EventHubProducerClient { let producer = this._producersMap.get(partitionId); if (!producer) { - producer = this._createProducer({ + producer = new EventHubProducer(this._context, { + retryOptions: this._clientOptions.retryOptions, partitionId: partitionId === "" ? undefined : partitionId }); this._producersMap.set(partitionId, producer); @@ -310,38 +316,4 @@ export class EventHubProducerClient { retryOptions: this._clientOptions.retryOptions }); } - - /** - * Creates an Event Hub producer that can send events to the Event Hub. - * If `partitionId` is specified in the `options`, all event data sent using the producer - * will be sent to the specified partition. - * Otherwise, they are automatically routed to an available partition by the Event Hubs service. - * - * Automatic routing of partitions is recommended because: - * - The sending of events will be highly available. - * - The event data will be evenly distributed among all available partitions. - * - * @param options The set of options to apply when creating the producer. - * - `partitionId` : The identifier of the partition that the producer can be bound to. - * - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events. - * A simple usage can be `{ "maxRetries": 4 }`. - * - * @throws Error if the underlying connection has been closed, create a new EventHubClient. - * @returns EventHubProducer - */ - private _createProducer(options?: EventHubProducerOptions): EventHubProducer { - if (!options) { - options = {}; - } - if (!options.retryOptions) { - options.retryOptions = this._clientOptions.retryOptions; - } - throwErrorIfConnectionClosed(this._context); - return new EventHubProducer( - this.eventHubName, - this.fullyQualifiedNamespace, - this._context, - options - ); - } } diff --git a/sdk/eventhub/event-hubs/src/sender.ts b/sdk/eventhub/event-hubs/src/sender.ts index 127b8c94cd22..270d20e1b79f 100644 --- a/sdk/eventhub/event-hubs/src/sender.ts +++ b/sdk/eventhub/event-hubs/src/sender.ts @@ -68,8 +68,6 @@ export class EventHubProducer { * @ignore */ constructor( - eventHubName: string, - fullyQualifiedNamespace: string, context: ConnectionContext, options?: EventHubProducerOptions ) { @@ -80,8 +78,8 @@ export class EventHubProducer { ? String(this._senderOptions.partitionId) : undefined; this._eventHubSender = EventHubSender.create(this._context, partitionId); - this._eventHubName = eventHubName; - this._fullyQualifiedNamespace = fullyQualifiedNamespace; + this._eventHubName = this._context.config.entityPath; + this._fullyQualifiedNamespace = this._context.config.host; } /** From d8620e01c04132efe5462d71eccee607f0e856cd Mon Sep 17 00:00:00 2001 From: Ramya Achutha Rao Date: Mon, 15 Jun 2020 12:27:25 -0700 Subject: [PATCH 4/5] Fix stale ts docs --- sdk/eventhub/event-hubs/src/managementClient.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 8398402d901d..38523f238b22 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -160,8 +160,6 @@ export class ManagementClient extends LinkEntity { /** * Provides the eventhub runtime information. * @ignore - * @param connection - The established amqp connection - * @returns */ async getEventHubProperties( options: OperationOptions & { retryOptions?: RetryOptions } = {} @@ -214,7 +212,6 @@ export class ManagementClient extends LinkEntity { /** * Provides information about the specified partition. * @ignore - * @param connection - The established amqp connection * @param partitionId Partition ID for which partition information is required. */ async getPartitionProperties( From 26b61431f2b2e7bbeb5c5fb3acf185a7dbf188a8 Mon Sep 17 00:00:00 2001 From: Ramya Achutha Rao Date: Mon, 15 Jun 2020 12:29:31 -0700 Subject: [PATCH 5/5] Avoid making async for no net benefits --- sdk/eventhub/event-hubs/src/eventHubProducerClient.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index aa0765e74487..ac353914b1fc 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -274,7 +274,7 @@ export class EventHubProducerClient { * @throws Error if the underlying connection has been closed, create a new EventHubProducerClient. * @throws AbortError if the operation is cancelled via the abortSignal. */ - async getEventHubProperties( + getEventHubProperties( options: GetEventHubPropertiesOptions = {} ): Promise { return this._context.managementSession!.getEventHubProperties({ @@ -291,12 +291,13 @@ export class EventHubProducerClient { * @throws Error if the underlying connection has been closed, create a new EventHubProducerClient. * @throws AbortError if the operation is cancelled via the abortSignal. */ - async getPartitionIds(options: GetPartitionIdsOptions = {}): Promise> { - const eventHubProperties = await this._context.managementSession!.getEventHubProperties({ + getPartitionIds(options: GetPartitionIdsOptions = {}): Promise> { + return this._context.managementSession!.getEventHubProperties({ ...options, retryOptions: this._clientOptions.retryOptions + }).then(eventHubProperties => { + return eventHubProperties.partitionIds; }); - return eventHubProperties.partitionIds; } /** @@ -307,7 +308,7 @@ export class EventHubProducerClient { * @throws Error if the underlying connection has been closed, create a new EventHubProducerClient. * @throws AbortError if the operation is cancelled via the abortSignal. */ - async getPartitionProperties( + getPartitionProperties( partitionId: string, options: GetPartitionPropertiesOptions = {} ): Promise {