Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Remove the middle layer of EventHubConsumer #9623

Merged
merged 1 commit into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 176 additions & 2 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@ import {
ReceiverOptions as RheaReceiverOptions,
types
} from "rhea-promise";
import { Constants, MessagingError, delay, translate } from "@azure/core-amqp";
import {
Constants,
MessagingError,
delay,
translate,
RetryConfig,
RetryOperationType,
retry
} from "@azure/core-amqp";
import { EventDataInternal, ReceivedEventData, fromAmqpMessage } from "./eventData";
import { EventHubConsumerOptions } from "./models/private";
import { ConnectionContext } from "./connectionContext";
Expand Down Expand Up @@ -147,7 +155,10 @@ export class EventHubReceiver extends LinkEntity {
* @property _isStreaming Indicated if messages are being received in streaming mode.
*/
private _isStreaming: boolean = false;

/**
* @property Denotes if close() was called on this receiver
*/
private _isClosed: boolean = false;
/**
* @property Returns sequenceNumber of the last event received from the service. This will not match the
* last event received by `EventHubConsumer` when the `_internalQueue` is not empty
Expand All @@ -165,6 +176,15 @@ export class EventHubReceiver extends LinkEntity {
return this._isReceivingMessages;
}

/**
* @property The last enqueued event information. This property will only
* be enabled when `trackLastEnqueuedEventProperties` option is set to true
* @readonly
*/
public get lastEnqueuedEventProperties(): LastEnqueuedEventProperties {
return this.runtimeInfo;
}

/**
* Instantiates a receiver that can be used to receive events over an AMQP receiver link in
* either batching or streaming mode.
Expand Down Expand Up @@ -373,6 +393,8 @@ export class EventHubReceiver extends LinkEntity {
logger.warning(msg);
logErrorStackTrace(err);
throw err;
} finally {
this._isClosed = true;
}
}

Expand Down Expand Up @@ -622,4 +644,156 @@ export class EventHubReceiver extends LinkEntity {
}
return rcvrOptions;
}

/**
* Returns a promise that resolves to an array of events received from the service.
*
* @param maxMessageCount The maximum number of messages to receive.
* @param maxWaitTimeInSeconds The maximum amount of time to wait to build up the requested message count;
* If not provided, it defaults to 60 seconds.
* @param abortSignal An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation.
* For example, use the @azure/abort-controller to create an `AbortSignal`.
*
* @returns Promise<ReceivedEventData[]>.
* @throws AbortError if the operation is cancelled via the abortSignal.
* @throws MessagingError if an error is encountered while receiving a message.
* @throws Error if the underlying connection or receiver has been closed.
* Create a new EventHubConsumer using the EventHubClient createConsumer method.
* @throws Error if the receiver is already receiving messages.
*/
async receiveBatch(
maxMessageCount: number,
maxWaitTimeInSeconds: number = 60,
abortSignal?: AbortSignalLike
): Promise<ReceivedEventData[]> {
// store events across multiple retries
const receivedEvents: ReceivedEventData[] = [];

const retrieveEvents = (): Promise<ReceivedEventData[]> => {
return new Promise(async (resolve, reject) => {
// if this consumer was closed,
// resolve the operation's promise with the events collected thus far in case
// the promise hasn't already been resolved.
if (this._isClosed || this._context.wasConnectionCloseCalled) {
return resolve(receivedEvents);
}

let timer: any;
const logOnAbort = (): void => {
const name = this.name;
const address = this.address;
const desc: string =
`[${this._context.connectionId}] The request operation on the Receiver "${name}" with ` +
`address "${address}" has been cancelled by the user.`;
// Cancellation is intentional so logging to 'info'.
logger.info(desc);
};

const rejectOnAbort = async (): Promise<void> => {
logOnAbort();
try {
await this.close();
} finally {
return reject(new AbortError("The receive operation has been cancelled by the user."));
}
};

// operation has been cancelled, so exit immediately
if (abortSignal && abortSignal.aborted) {
return await rejectOnAbort();
}

// updates the prefetch count so that the baseConsumer adds
// the correct number of credits to receive the same number of events.
const prefetchCount = Math.max(maxMessageCount - receivedEvents.length, 0);
if (prefetchCount === 0) {
return resolve(receivedEvents);
}

logger.verbose(
"[%s] Receiver '%s', setting the prefetch count to %d.",
this._context.connectionId,
this.name,
prefetchCount
);

const cleanUpBeforeReturn = (): void => {
this.clearHandlers();
clearTimeout(timer);
};

const onAbort = (): void => {
clearTimeout(timer);
rejectOnAbort();
};

this.registerHandlers(
(eventData) => {
receivedEvents.push(eventData);

// resolve the operation's promise after the requested
// number of events are received.
if (receivedEvents.length === maxMessageCount) {
logger.info(
"[%s] Batching Receiver '%s', %d messages received within %d seconds.",
this._context.connectionId,
this.name,
receivedEvents.length,
maxWaitTimeInSeconds
);
cleanUpBeforeReturn();
resolve(receivedEvents);
}
},
(err) => {
cleanUpBeforeReturn();
if (err.name === "AbortError") {
rejectOnAbort();
} else {
reject(err);
}
},
maxMessageCount - receivedEvents.length,
false,
abortSignal,
onAbort
);

const addTimeout = (): void => {
const msg = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
logger.verbose(msg, this._context.connectionId, maxWaitTimeInSeconds, this.name);

// resolve the operation's promise after the requested
// max number of seconds have passed.
timer = setTimeout(() => {
logger.info(
"[%s] Batching Receiver '%s', %d messages received when max wait time in seconds %d is over.",
this._context.connectionId,
this.name,
receivedEvents.length,
maxWaitTimeInSeconds
);
cleanUpBeforeReturn();
resolve(receivedEvents);
}, maxWaitTimeInSeconds * 1000);
};

addTimeout();
if (abortSignal && !abortSignal.aborted) {
abortSignal.addEventListener("abort", onAbort);
}
});
};

const retryOptions = this.options.retryOptions || {};
const config: RetryConfig<ReceivedEventData[]> = {
connectionHost: this._context.config.host,
connectionId: this._context.connectionId,
operation: retrieveEvents,
operationType: RetryOperationType.receiveMessage,
abortSignal: abortSignal,
retryOptions: retryOptions
};
return retry<ReceivedEventData[]>(config);
}
}
6 changes: 3 additions & 3 deletions sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { CommonEventProcessorOptions } from "./models/private";
import { CloseReason } from "./models/public";
import { EventPosition } from "./eventPosition";
import { PartitionProcessor } from "./partitionProcessor";
import { EventHubConsumer } from "./receiver";
import { EventHubReceiver } from "./eventHubReceiver";
import { AbortController } from "@azure/abort-controller";
import { MessagingError } from "@azure/core-amqp";
import { OperationOptions, getParentSpan } from "./util/operationOptions";
Expand All @@ -23,7 +23,7 @@ import { ConnectionContext } from "./connectionContext";
export class PartitionPump {
private _partitionProcessor: PartitionProcessor;
private _processorOptions: CommonEventProcessorOptions;
private _receiver: EventHubConsumer | undefined;
private _receiver: EventHubReceiver | undefined;
private _isReceiving: boolean = false;
private _isStopped: boolean = false;
private _abortController: AbortController;
Expand Down Expand Up @@ -60,7 +60,7 @@ export class PartitionPump {
}

private async _receiveEvents(partitionId: string): Promise<void> {
this._receiver = new EventHubConsumer(
this._receiver = new EventHubReceiver(
this._context,
this._partitionProcessor.consumerGroup,
partitionId,
Expand Down
Loading