diff --git a/sdk/eventhub/event-hubs/test/misc.spec.ts b/sdk/eventhub/event-hubs/test/misc.spec.ts index 5685c2bd415d..19b0cd73cac6 100644 --- a/sdk/eventhub/event-hubs/test/misc.spec.ts +++ b/sdk/eventhub/event-hubs/test/misc.spec.ts @@ -17,14 +17,12 @@ import { ReceivedEventData, Subscription } from "../src"; -import { EventHubClient } from "../src/impl/eventHubClient"; import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; import { TRACEPARENT_PROPERTY, extractSpanContextFromEventData } from "../src/diagnostics/instrumentEventData"; import { TraceFlags } from "@opentelemetry/api"; -import { EventHubConsumer } from "../src/receiver"; import { SubscriptionHandlerForTests } from "./utils/subscriptionHandlerForTests"; const env = getEnvVars(); @@ -33,10 +31,12 @@ describe("Misc tests", function(): void { connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], path: env[EnvVarKeys.EVENTHUB_NAME] }; - const client: EventHubClient = new EventHubClient(service.connectionString, service.path); - const producerClient = new EventHubProducerClient(service.connectionString, service.path); - let receiver: EventHubConsumer; + let consumerClient: EventHubConsumerClient; + let producerClient: EventHubProducerClient; let hubInfo: EventHubProperties; + let partitionId: string; + let lastEnqueuedOffset: number; + before("validate environment", async function(): Promise { should.exist( env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], @@ -46,46 +46,68 @@ describe("Misc tests", function(): void { env[EnvVarKeys.EVENTHUB_NAME], "define EVENTHUB_NAME in your environment before running integration tests." ); - hubInfo = await client.getProperties(); }); - after("close the connection", async function(): Promise { - await client.close(); + beforeEach(async () => { + debug("Creating the clients.."); + producerClient = new EventHubProducerClient(service.connectionString, service.path); + consumerClient = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString, + service.path + ); + hubInfo = await consumerClient.getEventHubProperties(); + partitionId = hubInfo.partitionIds[0]; + lastEnqueuedOffset = (await consumerClient.getPartitionProperties(partitionId)) + .lastEnqueuedOffset; + }); + + afterEach(async () => { + debug("Closing the clients.."); await producerClient.close(); + await consumerClient.close(); }); it("should be able to send and receive a large message correctly", async function(): Promise< void > { const bodysize = 220 * 1024; - const partitionId = hubInfo.partitionIds[0]; const msgString = "A".repeat(220 * 1024); const msgBody = Buffer.from(msgString); const obj: EventData = { body: msgBody }; - const offset = (await client.getPartitionProperties(partitionId)).lastEnqueuedOffset; - debug(`Partition ${partitionId} has last message with offset ${offset}.`); - debug("Sending one message with %d bytes.", bodysize); - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - offset - }); - let data = await receiver.receiveBatch(1, 1); - should.equal(data.length, 0, "Unexpected to receive message before client sends it"); + debug(`Partition ${partitionId} has last message with offset ${lastEnqueuedOffset}.`); + debug("Sending one message with %d bytes.", bodysize); await producerClient.sendBatch([obj], { partitionId }); debug("Successfully sent the large message."); - data = await receiver.receiveBatch(1, 30); - debug("Closing the receiver.."); - await receiver.close(); - debug("received message: ", data.length); - should.exist(data); - should.equal(data.length, 1); - should.equal(data[0].body.toString(), msgString); - should.not.exist((data[0].properties || {}).message_id); + + let subscription: Subscription | undefined; + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + debug("received message: ", data.length); + should.exist(data); + should.equal(data.length, 1); + should.equal(data[0].body.toString(), msgString); + should.not.exist((data[0].properties || {}).message_id); + resolve(); + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { offset: lastEnqueuedOffset } + } + ); + }); + await subscription!.close(); }); it("should be able to send and receive a JSON object as a message correctly", async function(): Promise< void > { - const partitionId = hubInfo.partitionIds[0]; const msgBody = { id: "123-456-789", weight: 10, @@ -99,28 +121,39 @@ describe("Misc tests", function(): void { ] }; const obj: EventData = { body: msgBody }; - const offset = (await client.getPartitionProperties(partitionId)).lastEnqueuedOffset; - debug(`Partition ${partitionId} has last message with offset ${offset}.`); + debug(`Partition ${partitionId} has last message with offset ${lastEnqueuedOffset}.`); debug("Sending one message %O", obj); - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - offset - }); await producerClient.sendBatch([obj], { partitionId }); debug("Successfully sent the large message."); - const data = await receiver.receiveBatch(1, 30); - await receiver.close(); - debug("received message: ", data); - should.exist(data); - should.equal(data.length, 1); - debug("Received message: %O", data); - assert.deepEqual(data[0].body, msgBody); - should.not.exist((data[0].properties || {}).message_id); + + let subscription: Subscription | undefined; + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + debug("received message: ", data.length); + should.exist(data); + should.equal(data.length, 1); + assert.deepEqual(data[0].body, msgBody); + should.not.exist((data[0].properties || {}).message_id); + resolve(); + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { offset: lastEnqueuedOffset } + } + ); + }); + await subscription!.close(); }); it("should be able to send and receive an array as a message correctly", async function(): Promise< void > { - const partitionId = hubInfo.partitionIds[0]; const msgBody = [ { id: "098-789-564", @@ -132,129 +165,170 @@ describe("Misc tests", function(): void { "some string" ]; const obj: EventData = { body: msgBody, properties: { message_id: uuid() } }; - const offset = (await client.getPartitionProperties(partitionId)).lastEnqueuedOffset; - debug(`Partition ${partitionId} has last message with offset ${offset}.`); + debug(`Partition ${partitionId} has last message with offset ${lastEnqueuedOffset}.`); debug("Sending one message %O", obj); - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - offset - }); await producerClient.sendBatch([obj], { partitionId }); debug("Successfully sent the large message."); - const data = await receiver.receiveBatch(1, 30); - await receiver.close(); - debug("received message: ", data); - should.exist(data); - should.equal(data.length, 1); - debug("Received message: %O", data); - assert.deepEqual(data[0].body, msgBody); - assert.strictEqual(data[0].properties!.message_id, obj.properties!.message_id); + + let subscription: Subscription | undefined; + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + debug("received message: ", data.length); + should.exist(data); + should.equal(data.length, 1); + assert.deepEqual(data[0].body, msgBody); + assert.strictEqual(data[0].properties!.message_id, obj.properties!.message_id); + resolve(); + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { offset: lastEnqueuedOffset } + } + ); + }); + await subscription!.close(); }); it("should be able to send a boolean as a message correctly", async function(): Promise { - const partitionId = hubInfo.partitionIds[0]; const msgBody = true; const obj: EventData = { body: msgBody }; - const offset = (await client.getPartitionProperties(partitionId)).lastEnqueuedOffset; - debug(`Partition ${partitionId} has last message with offset ${offset}.`); + debug(`Partition ${partitionId} has last message with offset ${lastEnqueuedOffset}.`); debug("Sending one message %O", obj); - receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - offset - }); await producerClient.sendBatch([obj], { partitionId }); debug("Successfully sent the large message."); - const data = await receiver.receiveBatch(1, 30); - await receiver.close(); - debug("received message: ", data); - should.exist(data); - should.equal(data.length, 1); - debug("Received message: %O", data); - assert.deepEqual(data[0].body, msgBody); - should.not.exist((data[0].properties || {}).message_id); + + let subscription: Subscription | undefined; + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + debug("received message: ", data.length); + should.exist(data); + should.equal(data.length, 1); + assert.deepEqual(data[0].body, msgBody); + should.not.exist((data[0].properties || {}).message_id); + resolve(); + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { offset: lastEnqueuedOffset } + } + ); + }); + await subscription!.close(); }); it("should be able to send and receive batched messages correctly ", async function(): Promise< void > { - try { - const partitionId = hubInfo.partitionIds[0]; - const offset = (await client.getPartitionProperties(partitionId)).lastEnqueuedOffset; - debug(`Partition ${partitionId} has last message with offset ${offset}.`); - const messageCount = 5; - const d: EventData[] = []; - for (let i = 0; i < messageCount; i++) { - const obj: EventData = { body: `Hello EH ${i}` }; - d.push(obj); - } + debug(`Partition ${partitionId} has last message with offset ${lastEnqueuedOffset}.`); + const messageCount = 5; + const d: EventData[] = []; + for (let i = 0; i < messageCount; i++) { + const obj: EventData = { body: `Hello EH ${i}` }; + d.push(obj); + } - await producerClient.sendBatch(d, { partitionId }); - debug("Successfully sent 5 messages batched together."); + await producerClient.sendBatch(d, { partitionId }); + debug("Successfully sent 5 messages batched together."); - const receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - offset - }); - const data = await receiver.receiveBatch(5, 30); - await receiver.close(); - debug("received message: ", data); - should.exist(data); - data.length.should.equal(5); - for (const message of data) { - should.not.exist((message.properties || {}).message_id); - } - } catch (err) { - debug("should not have happened, uber catch....", err); - throw err; + let subscription: Subscription | undefined; + const receivedMsgs: ReceivedEventData[] = []; + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + debug("received message: ", data.length); + receivedMsgs.push(...data); + if (receivedMsgs.length === 5) { + resolve(); + } + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { offset: lastEnqueuedOffset } + } + ); + }); + await subscription!.close(); + receivedMsgs.length.should.equal(5); + for (const message of receivedMsgs) { + should.not.exist((message.properties || {}).message_id); } }); it("should be able to send and receive batched messages as JSON objects correctly ", async function(): Promise< void > { - try { - const partitionId = hubInfo.partitionIds[0]; - const offset = (await client.getPartitionProperties(partitionId)).lastEnqueuedOffset; - debug(`Partition ${partitionId} has last message with offset ${offset}.`); - const messageCount = 5; - const d: EventData[] = []; - for (let i = 0; i < messageCount; i++) { - const obj: EventData = { - body: { - id: "123-456-789", - count: i, - weight: 10, - isBlue: true, - siblings: [ - { - id: "098-789-564", - weight: 20, - isBlue: false - } - ] - }, - properties: { - message_id: uuid() - } - }; - d.push(obj); - } + debug(`Partition ${partitionId} has last message with offset ${lastEnqueuedOffset}.`); + const messageCount = 5; + const d: EventData[] = []; + for (let i = 0; i < messageCount; i++) { + const obj: EventData = { + body: { + id: "123-456-789", + count: i, + weight: 10, + isBlue: true, + siblings: [ + { + id: "098-789-564", + weight: 20, + isBlue: false + } + ] + }, + properties: { + message_id: uuid() + } + }; + d.push(obj); + } - await producerClient.sendBatch(d, { partitionId }); - debug("Successfully sent 5 messages batched together."); + await producerClient.sendBatch(d, { partitionId }); + debug("Successfully sent 5 messages batched together."); - const receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, { - offset - }); - const data = await receiver.receiveBatch(5, 30); - await receiver.close(); - debug("received message: ", data); - should.exist(data); - should.equal(data[0].body.count, 0); - should.equal(data.length, 5); - for (const [index, message] of data.entries()) { - assert.strictEqual(message.properties!.message_id, d[index].properties!.message_id); - } - } catch (err) { - debug("should not have happened, uber catch....", err); - throw err; + let subscription: Subscription | undefined; + const receivedMsgs: ReceivedEventData[] = []; + await new Promise((resolve, reject) => { + subscription = consumerClient.subscribe( + partitionId, + { + processEvents: async (data) => { + debug("received message: ", data.length); + receivedMsgs.push(...data); + if (receivedMsgs.length === 5) { + resolve(); + } + }, + processError: async (err) => { + reject(err); + } + }, + { + startPosition: { offset: lastEnqueuedOffset } + } + ); + }); + await subscription!.close(); + should.equal(receivedMsgs[0].body.count, 0); + should.equal(receivedMsgs.length, 5); + for (const [index, message] of receivedMsgs.entries()) { + assert.strictEqual(message.properties!.message_id, d[index].properties!.message_id); } }); @@ -327,73 +401,73 @@ describe("Misc tests", function(): void { await consumerClient.close(); } }); +}).timeout(60000); - describe("extractSpanContextFromEventData", function() { - it("should extract a SpanContext from a properly instrumented EventData", function() { - const traceId = "11111111111111111111111111111111"; - const spanId = "2222222222222222"; - const flags = "00"; - const eventData: ReceivedEventData = { - body: "This is a test.", - enqueuedTimeUtc: new Date(), - offset: 0, - sequenceNumber: 0, - partitionKey: null, - properties: { - [TRACEPARENT_PROPERTY]: `00-${traceId}-${spanId}-${flags}` - } - }; +describe("extractSpanContextFromEventData", function() { + it("should extract a SpanContext from a properly instrumented EventData", function() { + const traceId = "11111111111111111111111111111111"; + const spanId = "2222222222222222"; + const flags = "00"; + const eventData: ReceivedEventData = { + body: "This is a test.", + enqueuedTimeUtc: new Date(), + offset: 0, + sequenceNumber: 0, + partitionKey: null, + properties: { + [TRACEPARENT_PROPERTY]: `00-${traceId}-${spanId}-${flags}` + } + }; - const spanContext = extractSpanContextFromEventData(eventData); + const spanContext = extractSpanContextFromEventData(eventData); - should.exist(spanContext, "Extracted spanContext should be defined."); - should.equal(spanContext!.traceId, traceId, "Extracted traceId does not match expectation."); - should.equal(spanContext!.spanId, spanId, "Extracted spanId does not match expectation."); - should.equal( - spanContext!.traceFlags, - TraceFlags.NONE, - "Extracted traceFlags do not match expectations." - ); - }); + should.exist(spanContext, "Extracted spanContext should be defined."); + should.equal(spanContext!.traceId, traceId, "Extracted traceId does not match expectation."); + should.equal(spanContext!.spanId, spanId, "Extracted spanId does not match expectation."); + should.equal( + spanContext!.traceFlags, + TraceFlags.NONE, + "Extracted traceFlags do not match expectations." + ); + }); - it("should return undefined when EventData is not properly instrumented", function() { - const traceId = "11111111111111111111111111111111"; - const spanId = "2222222222222222"; - const flags = "00"; - const eventData: ReceivedEventData = { - body: "This is a test.", - enqueuedTimeUtc: new Date(), - offset: 0, - sequenceNumber: 0, - partitionKey: null, - properties: { - [TRACEPARENT_PROPERTY]: `99-${traceId}-${spanId}-${flags}` - } - }; + it("should return undefined when EventData is not properly instrumented", function() { + const traceId = "11111111111111111111111111111111"; + const spanId = "2222222222222222"; + const flags = "00"; + const eventData: ReceivedEventData = { + body: "This is a test.", + enqueuedTimeUtc: new Date(), + offset: 0, + sequenceNumber: 0, + partitionKey: null, + properties: { + [TRACEPARENT_PROPERTY]: `99-${traceId}-${spanId}-${flags}` + } + }; - const spanContext = extractSpanContextFromEventData(eventData); + const spanContext = extractSpanContextFromEventData(eventData); - should.not.exist( - spanContext, - "Invalid diagnosticId version should return undefined spanContext." - ); - }); + should.not.exist( + spanContext, + "Invalid diagnosticId version should return undefined spanContext." + ); + }); - it("should return undefined when EventData is not instrumented", function() { - const eventData: ReceivedEventData = { - body: "This is a test.", - enqueuedTimeUtc: new Date(), - offset: 0, - sequenceNumber: 0, - partitionKey: null - }; + it("should return undefined when EventData is not instrumented", function() { + const eventData: ReceivedEventData = { + body: "This is a test.", + enqueuedTimeUtc: new Date(), + offset: 0, + sequenceNumber: 0, + partitionKey: null + }; - const spanContext = extractSpanContextFromEventData(eventData); + const spanContext = extractSpanContextFromEventData(eventData); - should.not.exist( - spanContext, - `Missing property "${TRACEPARENT_PROPERTY}" should return undefined spanContext.` - ); - }); + should.not.exist( + spanContext, + `Missing property "${TRACEPARENT_PROPERTY}" should return undefined spanContext.` + ); }); -}).timeout(60000); +});