Skip to content

Commit

Permalink
[Perf] [Service Bus] [Event Hubs] Event Based SDKs Perf Framework - p…
Browse files Browse the repository at this point in the history
…art 2 (Azure#18859)

* clean up old scripts

* tests -> perfTest

* base, event, batch perf test classes

* program.ts

* incorrect comment

* postSetup and preCleanup

* ParsedPerfOptions<TOptions = Record<string, unknown>> and related updates

* remove ! from options

* MockReceiverTest

* import "@azure/abort-controller" at perf classes

* BatchReceiveTest

* remove !

* changelog

* receiveBatch test

* remove aborter

* docs

* BatchSendTest

* receiveMessages

* no need of timer

* - Adds `PerfTestBase` and `BatchPerfTest`.

* lint fixes

* checkpoitn

* checkpoint

* manage parallels in base class

* parallels in base class

* MockEventReceiverTest and the framework

* fix readme

* formatting

* unneeded changes

* EventPerfTest

* mock test

* subscribe test for service-bus

* lint error

* formatting

* simplify

* update desc

* checkpoint

* subscribe-test

* mock event receiver 2

* formatting

* Address Mike's feedback

* lint format

* remove min-max delay

* maxEventsPerSecond logic

* Apply suggestions from code review

* Make EventPerfTest.eventRaise() and errorRaised() sync instead of async
- Methods do not perform any async work, and sync methods add less overhead to perf framework

* Cache event arrays to increase MockEventHubConsumerClient throughput

* Add MockReceiverTest to test list

* Improve throughput of MockReceiverTest
- Now matches NoOpTest * AverageBatchSize
- Also update options to match .NET

* Jeremy's feedback

* Clear the duration timer if the test ended sooner

* Mike's feedbakc

* lint:fix

* minor feedback

Co-authored-by: Mike Harder <mharder@microsoft.com>
  • Loading branch information
HarshaNalluru and mikeharder authored Mar 21, 2022
1 parent aaff27a commit efe321c
Show file tree
Hide file tree
Showing 18 changed files with 751 additions and 51 deletions.
4 changes: 4 additions & 0 deletions sdk/eventhub/perf-tests/event-hubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ Create an event-hubs namespace and populate the `.env` file with the following v
_Note: For more default options, refer [Perf-Framework-Default-Options](https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/test-utils/perf/README.md#keyconcepts)._

- To test `subscribe` - receiving a stream of messages

> `npm run perf-test:node -- SubscribeTest --duration 5`
- To test receiving messages (this test does not use the framework, is a standalone test)

`ts-node test/receive.spec.ts [eventBodySize] [numberOfEvents]`,
Expand Down
3 changes: 2 additions & 1 deletion sdk/eventhub/perf-tests/event-hubs/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

import { PerfProgram, selectPerfTest } from "@azure/test-utils-perf";
import { SendTest } from "./send.spec";
import { SubscribeTest } from "./subscribe.spec";

console.log("=== Starting the perf test ===");

const perfProgram = new PerfProgram(selectPerfTest([SendTest]));
const perfProgram = new PerfProgram(selectPerfTest([SendTest, SubscribeTest]));

perfProgram.run();
136 changes: 136 additions & 0 deletions sdk/eventhub/perf-tests/event-hubs/test/subscribe.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import {
earliestEventPosition,
EventHubConsumerClient,
EventHubProducerClient,
MessagingError,
PartitionContext,
ReceivedEventData,
} from "@azure/event-hubs";
import { PerfOptionDictionary, EventPerfTest, getEnvVar } from "@azure/test-utils-perf";

interface ReceiverOptions {
"number-of-events": number;
"event-size-in-bytes": number;
partitions: number;
"max-batch-size": number;
}

const connectionString = getEnvVar("EVENTHUB_CONNECTION_STRING");
const eventHubName = getEnvVar("EVENTHUB_NAME");
const consumerGroup = getEnvVar("CONSUMER_GROUP_NAME");

const consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);

export class SubscribeTest extends EventPerfTest<ReceiverOptions> {
receiver: EventHubConsumerClient;
subscriber: { close: () => Promise<void> } | undefined;

options: PerfOptionDictionary<ReceiverOptions> = {
"number-of-events": {
required: true,
description: "Total number of events to send",
shortName: "events",
longName: "events",
defaultValue: 10000,
},
"event-size-in-bytes": {
required: true,
description: "Size of each event in bytes",
shortName: "size",
longName: "size-in-bytes",
defaultValue: 2000,
},
partitions: {
required: true,
description: "number of partitions to publish, -1 publishes to all partitions",
shortName: "p",
longName: "partitions",
defaultValue: 10,
},
"max-batch-size": {
required: true,
description: "The number of events to request per batch",
shortName: "max-count",
longName: "max-batch-size",
defaultValue: 100,
},
};

constructor() {
super();
this.receiver = consumer;
}

/**
* Sends the messages to be received later.
*/
async globalSetup(): Promise<void> {
const {
"number-of-events": { value: numberOfEvents },
"event-size-in-bytes": { value: eventSize },
} = this.parsedOptions;

await sendBatch(numberOfEvents, eventSize, this.parsedOptions["partitions"].value);
}

setup() {
this.subscriber = this.receiver.subscribe(
{
processEvents: async (events: ReceivedEventData[], _context: PartitionContext) => {
for (const _event of events) {
this.eventRaised();
}
},
processError: async (error: Error | MessagingError, _context: PartitionContext) => {
this.errorRaised(error);
},
},
{
maxBatchSize: this.parsedOptions["max-batch-size"].value,
startPosition: earliestEventPosition,
}
);
}

async cleanup() {
await this.subscriber?.close();
await this.receiver.close();
}

async globalCleanup(): Promise<void> {
await consumer.close();
}
}

async function sendBatch(
numberOfEvents: number,
eventBodySize: number,
partitions: number
): Promise<void> {
const _payload = Buffer.alloc(eventBodySize);
const producer = new EventHubProducerClient(connectionString, eventHubName);
let partitionIds = await producer.getPartitionIds();
const numberOfPartitions =
partitionIds.length < partitions || partitions === -1 ? partitionIds.length : partitions;
partitionIds = partitionIds.slice(0, numberOfPartitions);
const numberOfEventsPerPartition = Math.ceil(numberOfEvents / numberOfPartitions);

for (const partitionId of partitionIds) {
const batch = await producer.createBatch({ partitionId });
let numberOfEventsSent = 0;
// add events to our batch
while (numberOfEventsSent < numberOfEventsPerPartition) {
while (
batch.tryAdd({ body: _payload }) &&
numberOfEventsSent + batch.count <= numberOfEventsPerPartition
);
await producer.sendBatch(batch);
numberOfEventsSent = numberOfEventsSent + batch.count;
}
}

await producer.close();
}
6 changes: 5 additions & 1 deletion sdk/servicebus/perf-tests/service-bus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ To test sending messages in batches

> `npm run perf-test:node -- BatchSendTest --warmup 2 --duration 7 --parallel 2`
> `npm run perf-test:node -- BatchSendTest --warmup 1 --duration 25 --iterations 2 --parallel 32 --messageBodySize 10240 --numberOfMessages 10`
> `npm run perf-test:node -- BatchSendTest --warmup 1 --duration 25 --iterations 2 --parallel 32 --size 10240 --numberOfMessages 10`
To test `receiveMessages` - receiving messages in batches

> `npm run perf-test:node -- BatchReceiveTest --duration 5 --size 2000 --number-of-messages 10000 --size-in-bytes 2000 --max-message-count 50`
To test `subscribe` - receiving a stream of messages

> `npm run perf-test:node -- SubscribeTest --duration 5 --size 2000 --mcc 1000`
_Note: For more default options, refer [Perf-Framework-Default-Options](https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/test-utils/perf/README.md#keyconcepts)._
5 changes: 4 additions & 1 deletion sdk/servicebus/perf-tests/service-bus/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import { PerfProgram, selectPerfTest } from "@azure/test-utils-perf";
import { BatchReceiveTest } from "./receiveBatch.spec";
import { BatchSendTest } from "./sendBatch.spec";
import { SubscribeTest } from "./subscribe.spec";

console.log("=== Starting the perf test ===");

const perfProgram = new PerfProgram(selectPerfTest([BatchSendTest, BatchReceiveTest]));
const perfProgram = new PerfProgram(
selectPerfTest([BatchSendTest, BatchReceiveTest, SubscribeTest])
);

perfProgram.run();
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class BatchReceiveTest extends ServiceBusTest<ReceiverOptions> {
}
}

async function sendMessages(
export async function sendMessages(
sender: ServiceBusSender,
numberOfMessages: number,
messageBodySize: number
Expand Down
93 changes: 93 additions & 0 deletions sdk/servicebus/perf-tests/service-bus/test/subscribe.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import {
ProcessErrorArgs,
ServiceBusReceivedMessage,
ServiceBusReceiver,
} from "@azure/service-bus";
import { PerfOptionDictionary, EventPerfTest } from "@azure/test-utils-perf";
import { sendMessages } from "./receiveBatch.spec";
import { ServiceBusTest } from "./sbBase.spec";

interface ReceiverOptions {
"number-of-messages": number;
"message-body-size-in-bytes": number;
"max-concurrent-calls": number;
}

export class SubscribeTest extends EventPerfTest<ReceiverOptions> {
receiver: ServiceBusReceiver;
subscriber: { close: () => Promise<void> } | undefined;

options: PerfOptionDictionary<ReceiverOptions> = {
"number-of-messages": {
required: true,
description: "Total number of messages to send",
shortName: "messages",
longName: "messages",
defaultValue: 100000,
},
"message-body-size-in-bytes": {
required: true,
description: "Size of each message body in bytes",
shortName: "size",
longName: "size-in-bytes",
defaultValue: 2000,
},
"max-concurrent-calls": {
required: true,
description: "max concurrent calls",
shortName: "mcc",
longName: "max-concurrent-calls",
defaultValue: 10,
},
};

constructor() {
super();
this.receiver = ServiceBusTest.sbClient.createReceiver(ServiceBusTest.queueName, {
receiveMode: "receiveAndDelete",
});
}

/**
* Sends the messages to be received later.
*/
async globalSetup(): Promise<void> {
await ServiceBusTest.sbAdminClient.createQueue(ServiceBusTest.queueName);
const sender = ServiceBusTest.sbClient.createSender(ServiceBusTest.queueName);

const {
"number-of-messages": { value: numberOfMessages },
"message-body-size-in-bytes": { value: messageBodySize },
} = this.parsedOptions;

await sendMessages(sender, numberOfMessages, messageBodySize);
}

setup() {
this.subscriber = this.receiver.subscribe(
{
processMessage: async (_message: ServiceBusReceivedMessage) => {
// { event: _message }
this.eventRaised();
},
processError: async (args: ProcessErrorArgs) => {
this.errorRaised(args.error);
},
},
{ maxConcurrentCalls: this.parsedOptions["max-concurrent-calls"].value }
);
}

async cleanup() {
this.subscriber && (await this.subscriber.close());
await this.receiver.close();
}

async globalCleanup(): Promise<void> {
await ServiceBusTest.sbClient.close();
await ServiceBusTest.sbAdminClient.deleteQueue(ServiceBusTest.queueName);
}
}
11 changes: 4 additions & 7 deletions sdk/test-utils/perf/src/batchPerfTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
} from "./testProxyHttpClient";
import { HttpClient } from "@azure/core-http";
import { PerfTestBase } from "./perfTestBase";
import { PerfParallel } from "./parallel";
import { AdditionalPolicyConfig } from "@azure/core-client";

/**
Expand Down Expand Up @@ -88,26 +87,24 @@ export abstract class BatchPerfTest<
* as well as the lastMillisecondsElapsed that reports the last test execution's elapsed time in comparison
* to the beginning of the execution of runLoop.
*
* @param parallel Object where to log the results from each execution.
* @param durationMilliseconds When to abort any execution.
* @param abortController Allows us to send through a signal determining when to abort any execution.
*/
public async runAll(
parallel: PerfParallel,
durationMilliseconds: number,
abortController: AbortController
): Promise<void> {
parallel.completedOperations = 0;
parallel.lastMillisecondsElapsed = 0;
this.completedOperations = 0;
this.lastMillisecondsElapsed = 0;
const start = process.hrtime();
while (!abortController.signal.aborted) {
const completedOperations = await this.runBatch(abortController.signal);

const elapsed = process.hrtime(start);
const elapsedMilliseconds = elapsed[0] * 1000 + elapsed[1] / 1000000;

parallel.completedOperations += completedOperations;
parallel.lastMillisecondsElapsed = elapsedMilliseconds;
this.completedOperations += completedOperations;
this.lastMillisecondsElapsed = elapsedMilliseconds;

// In runTest we create a setTimeout that is intended to abort the abortSignal
// once the durationMilliseconds have elapsed. That setTimeout might not get queued
Expand Down
Loading

0 comments on commit efe321c

Please sign in to comment.