Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Test err scenarios against EventHubConsumerClient, not EventHubConsumer #9521

Merged
merged 8 commits into from
Jun 17, 2020
5 changes: 3 additions & 2 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ export class EventHubConsumerClient {
options
));
} else if (
typeof handlersOrPartitionId1 === "string" &&
isSubscriptionEventHandlers(optionsOrHandlers2)
) {
// #2: subscribe overload (read from specific partition IDs), don't coordinate
Expand All @@ -416,7 +415,9 @@ export class EventHubConsumerClient {
validateEventPositions(options.startPosition);
}
({ targetedPartitionId, eventProcessor } = this.createEventProcessorForSinglePartition(
handlersOrPartitionId1,
// cast to string as downstream code expects partitionId to be string, but JS users could have given us anything.
// we don't validate the user input and instead rely on service throwing errors if any
String(handlersOrPartitionId1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So are we literally saying we take anything or do we have some constraints? For instance, is it just numbers and strings? Can we encode that into the typescript signature (number | string), for instance?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ask this because whenever I see an arbitrary string-ization I'm curious why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have no constraints. This follows the model of input validation where we try our best to make the connection/request to the service and service is responsible for giving any errors

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put a comment on there? I "reverted" it not knowing what the purpose of it was - might be nice for future programmers to know the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Consider this entire convo a 'NIT', btw. Feel free to proceed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with a0cd335

optionsOrHandlers2,
possibleOptions3
));
Expand Down
20 changes: 1 addition & 19 deletions sdk/eventhub/event-hubs/src/impl/eventHubClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { ConnectionContext } from "../connectionContext";
import { EventHubProperties, PartitionProperties } from "../managementClient";
import { EventPosition } from "../eventPosition";
import { EventHubConsumer } from "../receiver";
import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "../util/error";
import { throwErrorIfConnectionClosed } from "../util/error";
import {
EventHubClientOptions,
GetEventHubPropertiesOptions,
Expand Down Expand Up @@ -203,24 +203,6 @@ export class EventHubClient {
options.retryOptions = this._clientOptions.retryOptions;
}
throwErrorIfConnectionClosed(this._context);
throwTypeErrorIfParameterMissing(
this._context.connectionId,
"createConsumer",
"consumerGroup",
consumerGroup
);
throwTypeErrorIfParameterMissing(
this._context.connectionId,
"createConsumer",
"partitionId",
partitionId
);
throwTypeErrorIfParameterMissing(
this._context.connectionId,
"createConsumer",
"eventPosition",
eventPosition
);
partitionId = String(partitionId);
return new EventHubConsumer(this._context, consumerGroup, partitionId, eventPosition, options);
}
Expand Down
14 changes: 0 additions & 14 deletions sdk/eventhub/event-hubs/src/impl/partitionGate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ export class PartitionGate {
* @param partitionId A partition ID or the constant "all"
*/
add(partitionId: string | "all") {
this._validatePartitionId(partitionId);

if (
(partitionId === "all" && this._partitions.size > 0) ||
this._partitions.has(partitionId) ||
Expand All @@ -43,16 +41,4 @@ export class PartitionGate {
remove(partitionId: string | "all") {
this._partitions.delete(partitionId);
}

private _validatePartitionId(partitionId: string) {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
if (partitionId === "all") {
return;
}

const partitionNumber = parseInt(partitionId, 10);

if (isNaN(partitionNumber)) {
throw new TypeError(`Invalid partition number ${partitionId}`);
}
}
}
7 changes: 0 additions & 7 deletions sdk/eventhub/event-hubs/test/impl/partitionGate.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,4 @@ describe("PartitionGate", () => {
should.throw(() => gate.add("all"), /Partition already has a subscriber/);
should.throw(() => gate.add("0"), /Partition already has a subscriber/);
});

it("invalid IDs get thrown out", () => {
const gate = new PartitionGate();

should.throw(() => gate.add("allo"), "Invalid partition number allo");
should.throw(() => gate.add("woo"), "Invalid partition number woo");
});
});
235 changes: 71 additions & 164 deletions sdk/eventhub/event-hubs/test/receiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import {
latestEventPosition,
earliestEventPosition,
EventHubConsumerClient,
EventHubProducerClient
EventHubProducerClient,
Subscription
} from "../src";
import { EventHubClient } from "../src/impl/eventHubClient";
import { EnvVarKeys, getEnvVars } from "./utils/testUtils";
Expand All @@ -30,7 +31,8 @@ describe("EventHub Receiver", function(): void {
path: env[EnvVarKeys.EVENTHUB_NAME]
};
const client = new EventHubClient(service.connectionString, service.path);
const producerClient = new EventHubProducerClient(service.connectionString, service.path);
let producerClient: EventHubProducerClient;
let consumerClient: EventHubConsumerClient;

let receiver: EventHubConsumer | undefined;
let partitionIds: string[];
Expand All @@ -51,7 +53,18 @@ describe("EventHub Receiver", function(): void {
await producerClient.close();
});

afterEach("close the receiver link", async function(): Promise<void> {
beforeEach("Creating the clients", async () => {
producerClient = new EventHubProducerClient(service.connectionString, service.path);
consumerClient = new EventHubConsumerClient(
EventHubConsumerClient.defaultConsumerGroupName,
service.connectionString,
service.path
);
});

afterEach("Closing the clients", async () => {
await producerClient.close();
await consumerClient.close();
if (receiver && !receiver.isClosed) {
await receiver.close();
debug("After each - Receiver closed.");
Expand All @@ -61,10 +74,26 @@ describe("EventHub Receiver", function(): void {

describe("with partitionId 0 as number", function(): void {
it("should not throw an error", async function(): Promise<void> {
receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, 0 as any, {
sequenceNumber: 0
let subscription: Subscription | undefined;
await new Promise((resolve, reject) => {
subscription = consumerClient.subscribe(
// @ts-expect-error
0,
{
processEvents: async () => {
resolve();
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
},
processError: async (err) => {
reject(err);
}
},
{
startPosition: latestEventPosition,
maxWaitTimeInSeconds: 0 // Set timeout of 0 to resolve the promise ASAP
}
);
});
await receiver.receiveBatch(10, 20);
await subscription!.close();
});
});

Expand Down Expand Up @@ -574,53 +603,6 @@ describe("EventHub Receiver", function(): void {
});
});

describe("Errors when calling createConsumer", function(): void {
it("should throw an error if EventPosition is missing", function() {
try {
client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, "0", undefined as any);
throw new Error("Test failure");
} catch (err) {
err.name.should.equal("TypeError");
err.message.should.equal(`createConsumer called without required argument "eventPosition"`);
}
});

it("should throw an error if consumerGroup is missing", function() {
try {
client.createConsumer(undefined as any, "0", earliestEventPosition);
throw new Error("Test failure");
} catch (err) {
err.name.should.equal("TypeError");
err.message.should.equal(`createConsumer called without required argument "consumerGroup"`);
}
});

it("should throw MessagingEntityNotFoundError fr non existing consumer group", function(done: Mocha.Done): void {
try {
debug(">>>>>>>> client created.");
const onMessage = (data: any) => {
debug(">>>>> data: ", data);
};
const onError = (error: any) => {
debug(">>>>>>>> error occurred", error);
// sleep for 3 seconds so that receiver link and the session can be closed properly then
// in aftereach the connection can be closed. closing the connection while the receiver
// link and it's session are being closed (and the session being removed from rhea's
// internal map) can create havoc.
setTimeout(() => {
done(should.equal(error.code, "MessagingEntityNotFoundError"));
}, 3000);
};
receiver = client.createConsumer("some-random-name", "0", earliestEventPosition);
receiver.receive(onMessage, onError);
debug(">>>>>>>> attached the error handler on the receiver...");
} catch (err) {
debug(">>> Some error", err);
throw new Error("This code path must not have hit.. " + JSON.stringify(err));
}
});
});

describe("with trackLastEnqueuedEventProperties", function(): void {
it("should have lastEnqueuedEventProperties populated", async function(): Promise<void> {
const partitionId = partitionIds[0];
Expand Down Expand Up @@ -891,123 +873,48 @@ describe("EventHub Receiver", function(): void {
});

describe("Negative scenarios", function(): void {
describe("on invalid partition ids like", function(): void {
const invalidIds = ["XYZ", "-1", "1000", "-"];
invalidIds.forEach(function(id: string): void {
it(`"${id}" should throw an error`, async function(): Promise<void> {
try {
debug("Created receiver and will be receiving messages from partition id ...", id);
const d = await client
.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, id, latestEventPosition)
.receiveBatch(10, 3);
debug("received messages ", d.length);
throw new Error("Test failure");
} catch (err) {
debug("Receiver received an error", err);
should.exist(err);
err.message.should.match(
/.*The specified partition is invalid for an EventHub partition sender or receiver.*/gi
);
}
});
it("should throw MessagingEntityNotFoundError for non existing consumer group", async function(): Promise<
void
> {
const badConsumerClient = new EventHubConsumerClient(
"boo",
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
service.connectionString,
service.path
);
let subscription: Subscription | undefined;
const caughtErr = await new Promise<Error | MessagingError>((resolve) => {
subscription = badConsumerClient.subscribe(
{
processEvents: async () => {},
processError: async (err) => {
resolve(err);
}
},
{ maxWaitTimeInSeconds: 0 } // TODO: Remove after https://github.com/Azure/azure-sdk-for-js/pull/9543 is merged
);
});
await subscription!.close();
await badConsumerClient.close();

it(`" " should throw an invalid EventHub address error`, async function(): Promise<void> {
try {
const id = " ";
debug("Created receiver and will be receiving messages from partition id ...", id);
const d = await client
.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, id, latestEventPosition)
.receiveBatch(10, 3);
debug("received messages ", d.length);
throw new Error("Test failure");
} catch (err) {
debug("Receiver received an error", err);
should.exist(err);
err.message.should.match(
/.*Invalid EventHub address. It must be either of the following.*/gi
);
}
});
should.exist(caughtErr);
should.equal((caughtErr as MessagingError).code, "MessagingEntityNotFoundError");
});

const invalidIds2 = [""];
invalidIds2.forEach(function(id: string): void {
it(`"${id}" should throw an error`, async function(): Promise<void> {
try {
await client
.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, id, latestEventPosition)
.receiveBatch(10, 3);
throw new Error("Test failure");
} catch (err) {
debug(`>>>> Received error - `, err);
should.exist(err);
it(`should throw an invalid EventHub address error for invalid partition`, async function(): Promise<
void
> {
let subscription: Subscription | undefined;
const caughtErr = await new Promise<Error | MessagingError>((resolve) => {
subscription = consumerClient.subscribe("boo", {
processEvents: async () => {},
processError: async (err) => {
resolve(err);
}
});
});
});

it("should receive 'QuotaExceededError' when attempting to connect more than 5 receivers to a partition in a consumer group", function(done: Mocha.Done): void {
const partitionId = partitionIds[0];
const rcvHndlrs: ReceiveHandler[] = [];
const rcvrs: any[] = [];

// This test does not require recieving any messages. Just attempting to connect the 6th receiver causes
// onerr2() to be called with QuotaExceededError. So it's fastest to use latestEventPosition.
// Using EventPosition.earliestEventPosition() can cause timeouts or ServiceUnavailableException if the EventHub has
// a large number of messages.
const eventPosition = latestEventPosition;

debug(">>> Receivers length: ", rcvHndlrs.length);
for (let i = 1; i <= 5; i++) {
const rcvrId = `rcvr-${i}`;
debug(rcvrId);
const onMsg = (_data: ReceivedEventData) => {
if (!rcvrs[i]) {
rcvrs[i] = rcvrId;
debug("receiver id %s", rcvrId);
}
};
const onError = (err: MessagingError | Error) => {
debug("@@@@ Error received by receiver %s", rcvrId);
debug(err);
};
const rcvHndlr = client
.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, eventPosition)
.receive(onMsg, onError);
rcvHndlrs.push(rcvHndlr);
}
debug(">>> Attached message handlers to each receiver.");
setTimeout(() => {
debug(`Created 6th receiver - "rcvr-6"`);
const onmsg2 = () => {
// debug(data);
};
const onerr2 = (err: MessagingError | Error) => {
debug("@@@@ Error received by receiver rcvr-6");
debug(err);
should.equal((err as any).code, "QuotaExceededError");
const promises = [];
for (const rcvr of rcvHndlrs) {
promises.push(rcvr.stop());
}
Promise.all(promises)
.then(() => {
debug("Successfully closed all the receivers..");
done();
})
.catch((err) => {
debug(
"An error occurred while closing the receiver in the 'QuotaExceededError' test.",
err
);
done();
});
};
const failedRcvHandler = client
.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, eventPosition)
.receive(onmsg2, onerr2);
rcvHndlrs.push(failedRcvHandler);
}, 5000);
await subscription!.close();
should.exist(caughtErr);
should.equal((caughtErr as MessagingError).code, "ArgumentOutOfRangeError");
});
});
}).timeout(90000);