Skip to content

Commit

Permalink
Merge pull request Azure#95 from amarzavery/recovery
Browse files Browse the repository at this point in the history
Recovery
  • Loading branch information
amarzavery authored Jul 13, 2018
2 parents e04dc39 + 4de83ff commit 1fe169b
Show file tree
Hide file tree
Showing 10 changed files with 2,848 additions and 28 deletions.
40 changes: 38 additions & 2 deletions client/tests/receiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,43 @@ describe("EventHub Receiver", function () {
// });

describe("with epoch", function () {
it("should behave correctly when 2 epoch receivers with different values are connecting to a partition in a consumer group", function (done) {
it("should behave correctly when a receiver with lower epoch value is connected after a receiver with higher epoch value to a partition in a consumer group", function (done) {
const partitionId = hubInfo.partitionIds[0];
let epochRcvr1: ReceiveHandler, epochRcvr2: ReceiveHandler
const onError = (error) => {
debug(">>>> epoch Receiver 1", error);
throw new Error("An Error should not have happened for epoch receiver with epoch value 2.");
};
const onMsg = (data) => {
debug(">>>> epoch Receiver 1", data);
};
epochRcvr1 = client.receive(partitionId, onMsg, onError, { epoch: 2, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver 1 %s", epochRcvr1.name);
setTimeout(() => {
const onError2 = (error) => {
debug(">>>> epoch Receiver 2", error);
should.exist(error);
should.equal(error.name, "ReceiverDisconnectedError");
epochRcvr2.stop()
.then(() => epochRcvr1.stop())
.then(() => {
debug("Successfully closed the epoch receivers 1 and 2.");
done();
})
.catch((err) => {
debug("error occurred while closing the receivers... ", err);
done();
});
};
const onMsg2 = (data) => {
debug(">>>> epoch Receiver 2", data);
};
epochRcvr2 = client.receive(partitionId, onMsg, onError2, { epoch: 1, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver 2 %s", epochRcvr2.name);
}, 3000);
});

it("should behave correctly when a receiver with higher epoch value is connected after a receiver with lower epoch value to a partition in a consumer group", function (done) {
const partitionId = hubInfo.partitionIds[0];
let epochRcvr1: ReceiveHandler, epochRcvr2: ReceiveHandler
const onError = (error) => {
Expand Down Expand Up @@ -286,7 +322,7 @@ describe("EventHub Receiver", function () {
const onMsg2 = (data) => {
debug(">>>> epoch Receiver 2", data);
};
epochRcvr2 = client.receive(partitionId, onMsg, onError, { epoch: 2, eventPosition: EventPosition.fromEnd() });
epochRcvr2 = client.receive(partitionId, onMsg, onError2, { epoch: 2, eventPosition: EventPosition.fromEnd() });
debug("Created epoch receiver 2 %s", epochRcvr2.name);
}, 3000);
});
Expand Down
2 changes: 1 addition & 1 deletion processor/lib/blobLeaseManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ export class BlobLeaseManager extends EventEmitter implements LeaseManager {
debug("[%s] Lease '%s' lost. Attempting to re-acquire.",
this.hostName, lease.fullUri);
this._acquire(lease).catch((err) => {
debug("[%s] An error occured while acquiring the lease '%s': %O",
debug("[%s] An error occured while re-acquiring the lease '%s': %O",
this.hostName, lease.fullUri, err);
});
}, renewPeriod * 2);
Expand Down
4 changes: 2 additions & 2 deletions processor/lib/eventProcessorHost.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { PartitionContext } from "./partitionContext";
import { EventEmitter } from "events";
import {
TokenProvider, EventHubRuntimeInformation, EventHubPartitionRuntimeInformation,
ReceiveOptions, EventPosition, OnMessage, OnError, EventHubsError, EventHubClient, ClientOptions,
ReceiveOptions, EventPosition, OnMessage, OnError, MessagingError, EventHubClient, ClientOptions,
Dictionary, EventData, ReceiveHandler, ClientOptionsBase
} from "azure-event-hubs";
import {
Expand Down Expand Up @@ -346,7 +346,7 @@ export class EventProcessorHost extends EventEmitter {
context.updateCheckpointDataFromEventData(eventData);
this.emit(EventProcessorHost.message, context, eventData);
};
const onError: OnError = (error: EventHubsError | Error) => {
const onError: OnError = (error: MessagingError | Error) => {
debug("[%s] [EPH - '%s'] Receiver '%s' received an error: %O.",
this._eventHubClient.connectionId, this._hostName, receiveHandler.name, error);
this.emit(EventProcessorHost.error, error);
Expand Down
2 changes: 1 addition & 1 deletion processor/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ export { Lease } from "./blobLease";
export { LeaseManager, LeaseWithDuration } from "./blobLeaseManager";
export {
delay, EventData, OnError as OnEphError, EventPosition, EventHubPartitionRuntimeInformation,
EventHubRuntimeInformation, EventHubsError, DataTransformer
EventHubRuntimeInformation, MessagingError, DataTransformer
} from "azure-event-hubs";
Loading

0 comments on commit 1fe169b

Please sign in to comment.