Skip to content

Commit

Permalink
[Event Hubs] Remove the middle layer of EventHubConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya-rao-a committed Jun 23, 2020
1 parent fbf0aad commit e58bafb
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 378 deletions.
178 changes: 176 additions & 2 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@ import {
ReceiverOptions as RheaReceiverOptions,
types
} from "rhea-promise";
import { Constants, MessagingError, delay, translate } from "@azure/core-amqp";
import {
Constants,
MessagingError,
delay,
translate,
RetryConfig,
RetryOperationType,
retry
} from "@azure/core-amqp";
import { EventDataInternal, ReceivedEventData, fromAmqpMessage } from "./eventData";
import { EventHubConsumerOptions } from "./models/private";
import { ConnectionContext } from "./connectionContext";
Expand Down Expand Up @@ -147,7 +155,10 @@ export class EventHubReceiver extends LinkEntity {
* @property _isStreaming Indicated if messages are being received in streaming mode.
*/
private _isStreaming: boolean = false;

/**
* @property Denotes if close() was called on this receiver
*/
private _isClosed: boolean = false;
/**
* @property Returns sequenceNumber of the last event received from the service. This will not match the
* last event received by `EventHubConsumer` when the `_internalQueue` is not empty
Expand All @@ -165,6 +176,15 @@ export class EventHubReceiver extends LinkEntity {
return this._isReceivingMessages;
}

/**
* @property The last enqueued event information. This property will only
* be enabled when `trackLastEnqueuedEventProperties` option is set to true
* @readonly
*/
public get lastEnqueuedEventProperties(): LastEnqueuedEventProperties {
return this.runtimeInfo;
}

/**
* Instantiates a receiver that can be used to receive events over an AMQP receiver link in
* either batching or streaming mode.
Expand Down Expand Up @@ -373,6 +393,8 @@ export class EventHubReceiver extends LinkEntity {
logger.warning(msg);
logErrorStackTrace(err);
throw err;
} finally {
this._isClosed = true;
}
}

Expand Down Expand Up @@ -622,4 +644,156 @@ export class EventHubReceiver extends LinkEntity {
}
return rcvrOptions;
}

/**
* Returns a promise that resolves to an array of events received from the service.
*
* @param maxMessageCount The maximum number of messages to receive.
* @param maxWaitTimeInSeconds The maximum amount of time to wait to build up the requested message count;
* If not provided, it defaults to 60 seconds.
* @param abortSignal An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation.
* For example, use the @azure/abort-controller to create an `AbortSignal`.
*
* @returns Promise<ReceivedEventData[]>.
* @throws AbortError if the operation is cancelled via the abortSignal.
* @throws MessagingError if an error is encountered while receiving a message.
* @throws Error if the underlying connection or receiver has been closed.
* Create a new EventHubConsumer using the EventHubClient createConsumer method.
* @throws Error if the receiver is already receiving messages.
*/
async receiveBatch(
maxMessageCount: number,
maxWaitTimeInSeconds: number = 60,
abortSignal?: AbortSignalLike
): Promise<ReceivedEventData[]> {
// store events across multiple retries
const receivedEvents: ReceivedEventData[] = [];

const retrieveEvents = (): Promise<ReceivedEventData[]> => {
return new Promise(async (resolve, reject) => {
// if this consumer was closed,
// resolve the operation's promise with the events collected thus far in case
// the promise hasn't already been resolved.
if (this._isClosed || this._context.wasConnectionCloseCalled) {
return resolve(receivedEvents);
}

let timer: any;
const logOnAbort = (): void => {
const name = this.name;
const address = this.address;
const desc: string =
`[${this._context.connectionId}] The request operation on the Receiver "${name}" with ` +
`address "${address}" has been cancelled by the user.`;
// Cancellation is intentional so logging to 'info'.
logger.info(desc);
};

const rejectOnAbort = async (): Promise<void> => {
logOnAbort();
try {
await this.close();
} finally {
return reject(new AbortError("The receive operation has been cancelled by the user."));
}
};

// operation has been cancelled, so exit immediately
if (abortSignal && abortSignal.aborted) {
return await rejectOnAbort();
}

// updates the prefetch count so that the baseConsumer adds
// the correct number of credits to receive the same number of events.
const prefetchCount = Math.max(maxMessageCount - receivedEvents.length, 0);
if (prefetchCount === 0) {
return resolve(receivedEvents);
}

logger.verbose(
"[%s] Receiver '%s', setting the prefetch count to %d.",
this._context.connectionId,
this.name,
prefetchCount
);

const cleanUpBeforeReturn = (): void => {
this.clearHandlers();
clearTimeout(timer);
};

const onAbort = (): void => {
clearTimeout(timer);
rejectOnAbort();
};

this.registerHandlers(
(eventData) => {
receivedEvents.push(eventData);

// resolve the operation's promise after the requested
// number of events are received.
if (receivedEvents.length === maxMessageCount) {
logger.info(
"[%s] Batching Receiver '%s', %d messages received within %d seconds.",
this._context.connectionId,
this.name,
receivedEvents.length,
maxWaitTimeInSeconds
);
cleanUpBeforeReturn();
resolve(receivedEvents);
}
},
(err) => {
cleanUpBeforeReturn();
if (err.name === "AbortError") {
rejectOnAbort();
} else {
reject(err);
}
},
maxMessageCount - receivedEvents.length,
false,
abortSignal,
onAbort
);

const addTimeout = (): void => {
const msg = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
logger.verbose(msg, this._context.connectionId, maxWaitTimeInSeconds, this.name);

// resolve the operation's promise after the requested
// max number of seconds have passed.
timer = setTimeout(() => {
logger.info(
"[%s] Batching Receiver '%s', %d messages received when max wait time in seconds %d is over.",
this._context.connectionId,
this.name,
receivedEvents.length,
maxWaitTimeInSeconds
);
cleanUpBeforeReturn();
resolve(receivedEvents);
}, maxWaitTimeInSeconds * 1000);
};

addTimeout();
if (abortSignal && !abortSignal.aborted) {
abortSignal.addEventListener("abort", onAbort);
}
});
};

const retryOptions = this.options.retryOptions || {};
const config: RetryConfig<ReceivedEventData[]> = {
connectionHost: this._context.config.host,
connectionId: this._context.connectionId,
operation: retrieveEvents,
operationType: RetryOperationType.receiveMessage,
abortSignal: abortSignal,
retryOptions: retryOptions
};
return retry<ReceivedEventData[]>(config);
}
}
6 changes: 3 additions & 3 deletions sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { CommonEventProcessorOptions } from "./models/private";
import { CloseReason } from "./models/public";
import { EventPosition } from "./eventPosition";
import { PartitionProcessor } from "./partitionProcessor";
import { EventHubConsumer } from "./receiver";
import { EventHubReceiver } from "./eventHubReceiver";
import { AbortController } from "@azure/abort-controller";
import { MessagingError } from "@azure/core-amqp";
import { OperationOptions, getParentSpan } from "./util/operationOptions";
Expand All @@ -23,7 +23,7 @@ import { ConnectionContext } from "./connectionContext";
export class PartitionPump {
private _partitionProcessor: PartitionProcessor;
private _processorOptions: CommonEventProcessorOptions;
private _receiver: EventHubConsumer | undefined;
private _receiver: EventHubReceiver | undefined;
private _isReceiving: boolean = false;
private _isStopped: boolean = false;
private _abortController: AbortController;
Expand Down Expand Up @@ -60,7 +60,7 @@ export class PartitionPump {
}

private async _receiveEvents(partitionId: string): Promise<void> {
this._receiver = new EventHubConsumer(
this._receiver = new EventHubReceiver(
this._context,
this._partitionProcessor.consumerGroup,
partitionId,
Expand Down
Loading

0 comments on commit e58bafb

Please sign in to comment.