Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Ensure receiver is functional #3356

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions sdk/eventhub/event-hubs/samples/receiveEventsLoop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,40 @@ import { EventHubClient, EventPosition } from "@azure/event-hubs";

// Define connection string and related Event Hubs entity name here
const connectionString = "";
const eventHubsName = "";
const eventHubName = "";

async function main(): Promise<void> {
const client = EventHubClient.createFromConnectionString(connectionString, eventHubsName);
const client = EventHubClient.createFromConnectionString(connectionString, eventHubName);
const partitionIds = await client.getPartitionIds();
let eventPosition = EventPosition.fromStart();
const receiver = client.createReceiver(partitionIds[0], {
eventPosition: EventPosition.fromFirstAvailableEvent(),
consumerGroup: "$Default"
});
const batchSize = 1;

for (let i = 0; i < 10; i++) {
const events = await client.receiveBatch(partitionIds[0], batchSize, 5, {
eventPosition: eventPosition,
consumerGroup: "$Default"
});
if (!events.length) {
console.log("No more events to receive");
break;
try {
for (let i = 0; i < 5; i++) {
const events = await receiver.receiveBatch(batchSize, 5);
if (!events.length) {
console.log("No more events to receive");
break;
}
console.log(`Received events: ${events.map(event => event.body)}`);
}
eventPosition = EventPosition.fromSequenceNumber(events[events.length - 1].sequenceNumber!);
console.log(`Received events #${i}: ${events.map(event => event.body)}`);

let receivedIteratorEvent = 0;
for await (const events of receiver.getEventIterator({ preFetchCount: 5 })) {
receivedIteratorEvent++;
console.log(`Received event: ${events.body}`);
if (receivedIteratorEvent === 5) {
break;
}
}

await receiver.close();
} finally {
await client.close();
}
await client.close();
}

main().catch(err => {
Expand Down
26 changes: 15 additions & 11 deletions sdk/eventhub/event-hubs/samples/receiveEventsStreaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@
to populate Event Hubs before running this sample.
*/

import { EventHubClient, EventPosition, OnMessage, OnError, MessagingError, delay, EventData } from "@azure/event-hubs";
import { EventHubClient, OnMessage, OnError, MessagingError, delay, EventData, EventPosition } from "../SRC";

// Define connection string and related Event Hubs entity name here
const connectionString = "";
const eventHubsName = "";
const eventHubName = "";

async function main(): Promise<void> {
const client = EventHubClient.createFromConnectionString(connectionString, eventHubsName);
const client = EventHubClient.createFromConnectionString(connectionString, eventHubName);
const partitionIds = await client.getPartitionIds();
const receiver = client.createReceiver(partitionIds[0], {
eventPosition: EventPosition.fromFirstAvailableEvent(),
consumerGroup: "$Default"
});

const onMessageHandler: OnMessage = (brokeredMessage: EventData) => {
console.log(`Received event: ${brokeredMessage.body}`);
Expand All @@ -25,15 +29,15 @@ async function main(): Promise<void> {
console.log("Error occurred: ", err);
};

const rcvHandler = client.receive(partitionIds[0], onMessageHandler, onErrorHandler, {
eventPosition: EventPosition.fromStart(),
consumerGroup: "$Default"
});
try {
const rcvHandler = receiver.receive(onMessageHandler, onErrorHandler);

// Waiting long enough before closing the receiver to receive event
await delay(5000);
await rcvHandler.stop();
await client.close();
// Waiting long enough before closing the receiver to receive event
await delay(5000);
await rcvHandler.stop();
} finally {
await client.close();
}
}

main().catch(err => {
Expand Down
34 changes: 18 additions & 16 deletions sdk/eventhub/event-hubs/samples/sendEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,26 @@ const listOfScientists = [
async function main(): Promise<void> {
const client = EventHubClient.createFromConnectionString(connectionString, eventHubName);
const partitionIds = await client.getPartitionIds();
const sender = client.createSender(partitionIds[0]);
const sender = client.createSender({ partitionId: partitionIds[0] });
const events: EventData[] = [];
// NOTE: For receiving events from Azure Stream Analytics, please send Events to an EventHub
// where the body is a JSON object/array.
// const events = [
// { body: { "message": "Hello World 1" }, applicationProperties: { id: "Some id" }, partitionKey: "pk786" },
// { body: { "message": "Hello World 2" } },
// { body: { "message": "Hello World 3" } }
// ];
for (let index = 0; index < listOfScientists.length; index++) {
const scientist = listOfScientists[index];
events.push({ body: `${scientist.firstName} ${scientist.name}` });
try {
// NOTE: For receiving events from Azure Stream Analytics, please send Events to an EventHub
// where the body is a JSON object/array.
// const events = [
// { body: { "message": "Hello World 1" }, applicationProperties: { id: "Some id" }, partitionKey: "pk786" },
// { body: { "message": "Hello World 2" } },
// { body: { "message": "Hello World 3" } }
// ];
for (let index = 0; index < listOfScientists.length; index++) {
const scientist = listOfScientists[index];
events.push({ body: `${scientist.firstName} ${scientist.name}` });
}
console.log("Sending batch events...");

await sender.send(events);
} finally {
await client.close();
}
console.log("Sending batch events...");

await sender.send(events);

await client.close();
}

main().catch(err => {
Expand Down
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ export {
TokenProvider,
TokenInfo,
AadTokenProvider,
SasTokenProvider
SasTokenProvider,
delay
} from "@azure/amqp-common";
60 changes: 49 additions & 11 deletions sdk/eventhub/event-hubs/src/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { MessagingError, Constants } from "@azure/amqp-common";
import { StreamingReceiver, ReceiveHandler } from "./streamingReceiver";
import { BatchingReceiver } from "./batchingReceiver";
import { Aborter } from "./aborter";
import { throwErrorIfConnectionClosed } from "./util/error";

/**
* Options to pass when creating an iterator to iterate over events
Expand Down Expand Up @@ -47,6 +48,8 @@ export class Receiver {

private _partitionId: string;
private _receiverOptions: ReceiverOptions;
private _streamingReceiver: StreamingReceiver | undefined;
private _batchingReceiver: BatchingReceiver | undefined;

/**
* @property Returns `true` if the receiver is closed. This can happen either because the receiver
Expand Down Expand Up @@ -103,10 +106,10 @@ export class Receiver {
* @returns {ReceiveHandler} ReceiveHandler - An object that provides a mechanism to stop receiving more messages.
*/
receive(onMessage: OnMessage, onError: OnError, cancellationToken?: Aborter): ReceiveHandler {
const sReceiver = StreamingReceiver.create(this._context, this.partitionId, this._receiverOptions);
sReceiver.prefetchCount = Constants.defaultPrefetchCount;
this._context.receivers[sReceiver.name] = sReceiver;
return sReceiver.receive(onMessage, onError);
this._throwIfReceiverOrConnectionClosed();
this._streamingReceiver = StreamingReceiver.create(this._context, this.partitionId, this._receiverOptions);
this._streamingReceiver.prefetchCount = Constants.defaultPrefetchCount;
return this._streamingReceiver.receive(onMessage, onError);
}

/**
Expand All @@ -122,14 +125,37 @@ export class Receiver {
/**
* Closes the underlying AMQP receiver link.
* Once closed, the receiver cannot be used for any further operations.
* Use the `createReceiver` function on the QueueClient or SubscriptionClient to instantiate
* Use the `createReceiver` function on the EventHubClient to instantiate
* a new Receiver
*
* @returns {Promise<void>}
*/
async close(): Promise<void> {
this._isClosed = true;
try {
if (this._context.connection && this._context.connection.isOpen()) {
// Close the streaming receiver.
if (this._streamingReceiver) {
await this._streamingReceiver.close();
}

// Close the batching receiver.
if (this._batchingReceiver) {
await this._batchingReceiver.close();
}
}
} catch (err) {
log.error(
"[%s] An error occurred while closing the Receiver for %s: %O",
this._context.connectionId,
this._context.config.entityPath,
err
);
throw err;
} finally {
this._isClosed = true;
}
}

/**
* Indicates whether the receiver is currently receiving messages or not.
* When this returns true, new `registerMessageHandler()` or `receiveMessages()` calls cannot be made.
Expand All @@ -154,25 +180,25 @@ export class Receiver {
maxWaitTimeInSeconds?: number,
cancellationToken?: Aborter
): Promise<ReceivedEventData[]> {
const bReceiver = BatchingReceiver.create(this._context, this.partitionId, this._receiverOptions);
this._context.receivers[bReceiver.name] = bReceiver;
this._throwIfReceiverOrConnectionClosed();
this._batchingReceiver = BatchingReceiver.create(this._context, this.partitionId, this._receiverOptions);
let error: MessagingError | undefined;
let result: ReceivedEventData[] = [];
try {
result = await bReceiver.receive(maxMessageCount, maxWaitTimeInSeconds);
result = await this._batchingReceiver.receive(maxMessageCount, maxWaitTimeInSeconds);
} catch (err) {
error = err;
log.error(
"[%s] Receiver '%s', an error occurred while receiving %d messages for %d max time:\n %O",
this._context.connectionId,
bReceiver.name,
this._batchingReceiver.name,
maxMessageCount,
maxWaitTimeInSeconds,
err
);
}
try {
await bReceiver.close();
await this._batchingReceiver.close();
} catch (err) {
// do nothing about it.
}
Expand All @@ -181,4 +207,16 @@ export class Receiver {
}
return result;
}

private _throwIfReceiverOrConnectionClosed(): void {
throwErrorIfConnectionClosed(this._context);
if (this.isClosed) {
const errorMessage =
`The receiver for "${this._context.config.entityPath}" has been closed and can no longer be used. ` +
`Please create a new receiver using the "createReceiver" function on the EventHubClient.`;
const error = new Error(errorMessage);
log.error(`[${this._context.connectionId}] %O`, error);
throw error;
}
}
}
9 changes: 2 additions & 7 deletions sdk/eventhub/event-hubs/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,8 @@ export class Sender {
*/
async close(): Promise<void> {
try {
if (
this._context.connection &&
this._context.connection.isOpen() &&
this._eventHubSender &&
this._context.senders[this._eventHubSender.name]
) {
await this._context.senders[this._eventHubSender.name].close();
if (this._context.connection && this._context.connection.isOpen() && this._eventHubSender) {
await this._eventHubSender.close();
}
this._isClosed = true;
} catch (err) {
Expand Down
Loading