Skip to content

Commit

Permalink
[EventHubs] Stress test refactor (Azure#20389)
Browse files Browse the repository at this point in the history
* init commit for stress

* add dotenv dep

* docker, helm, k8s

* apply changes

* run forever

* add chaos

* update chaos jos

* Apply suggestions from code review

Co-authored-by: Ben Broderick Phillips <ben@benbp.net>

* minor fixes

* revert external targets

Co-authored-by: Ben Broderick Phillips <ben@benbp.net>
  • Loading branch information
yunhaoling and benbp authored Sep 22, 2021
1 parent eca1c81 commit 4b3397d
Show file tree
Hide file tree
Showing 19 changed files with 415 additions and 38 deletions.
6 changes: 6 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/Chart.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.1.6
digest: sha256:b97697ef5f303eec43e9a94fca8e312d20b8aed71318250499344aeca9880d31
generated: "2021-08-24T11:24:15.375395-07:00"
13 changes: 13 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v2
name: python-eventhubs-stress-test
description: python event hubs stress test.
version: 0.1.2
appVersion: v0.2
annotations:
stressTest: 'true'
namespace: python-eventhubs-stress-test-ns

dependencies:
- name: stress-test-addons
version: 0.1.6
repository: https://stresstestcharts.blob.core.windows.net/helm/
10 changes: 10 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# syntax=docker/dockerfile:1

FROM python:3.8-slim-buster

WORKDIR /app

COPY ./scripts /app/stress/scripts

WORKDIR /app/stress/scripts
RUN pip3 install -r dev_requirement.txt
8 changes: 0 additions & 8 deletions sdk/eventhub/azure-eventhub/stress/dev_requirement.txt

This file was deleted.

5 changes: 5 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/parameters.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentParameters.json#",
"contentVersion": "1.0.0.0",
"parameters": { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os

from opencensus.ext.azure import metrics_exporter
from opencensus.stats import aggregation as aggregation_module
from opencensus.stats import measure as measure_module
Expand All @@ -26,7 +23,7 @@ def __init__(self, test_name, test_description=None):
self.desc = test_description

events_measure_name = "The number of events handled by " + self.name
events_measure_desc = "The number of events handled by " + self.desc if self.desc else None
events_measure_desc = "The number of events handled by" + self.desc if self.desc else None
memory_measure_name = "memory usage percentage for " + self.name
memory_measure_desc = "memory usage percentage for " + self.desc if self.desc else None
cpu_measure_name = "cpu usage percentage for " + self.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
from collections import defaultdict
from functools import partial
from dotenv import load_dotenv

from azure.identity.aio import ClientSecretCredential
from azure.eventhub.aio import EventHubConsumerClient
Expand All @@ -22,6 +23,9 @@
from process_monitor import ProcessMonitor
from app_insights_metric import AzureMonitorMetric

ENV_FILE = os.environ.get('ENV_FILE')
load_dotenv(dotenv_path=ENV_FILE, override=True)


def parse_starting_position(args):
starting_position = None
Expand All @@ -39,25 +43,26 @@ def parse_starting_position(args):


parser = argparse.ArgumentParser()
parser.add_argument("--link_credit", default=3000, type=int)
parser.add_argument("--output_interval", type=float, default=1000)
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
parser.add_argument("--consumer_group", help="Consumer group name", default="$default")
parser.add_argument("--link_credit", default=int(os.environ.get("LINK_CREDIT", 3000)), type=int)
parser.add_argument("--output_interval", type=float, default=int(os.environ.get("OUTPUT_INTERVAL", 5000)))
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=int(os.environ.get("DURATION", 999999999)))
parser.add_argument("--consumer_group", help="Consumer group name", default=os.environ.get("CONSUMER_GROUP", "$default"))
parser.add_argument("--auth_timeout", help="Authorization Timeout", type=float, default=60)
parser.add_argument("--starting_offset", help="Starting offset", type=str)
parser.add_argument("--starting_offset", help="Starting offset", type=str, default=os.environ.get("STARTING_OFFSET", "-1"))
parser.add_argument("--starting_sequence_number", help="Starting sequence number", type=int)
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss", type=str)
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss")
parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0)
parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int)
parser.add_argument("--max_batch_size", type=int, default=0,
parser.add_argument("--max_batch_size", type=int, default=int(os.environ.get("MAX_BATCH_SIZE", 0)),
help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()")
parser.add_argument("--max_wait_time", type=float, default=0,
help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()")

parser.add_argument("--track_last_enqueued_event_properties", action="store_true")
parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10)
parser.add_argument("--conn_str", help="EventHub connection string",
default=os.environ.get('EVENT_HUB_PERF_32_CONN_STR'))
parser.add_argument("--eventhub", help="Name of EventHub")
default=os.environ.get('EVENT_HUB_CONN_STR'))
parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
parser.add_argument("--address", help="Address URI to the EventHub entity")
parser.add_argument("--sas_policy", help="Name of the shared access policy to authenticate with")
parser.add_argument("--sas_key", help="Shared access key")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
from collections import defaultdict
from functools import partial
from dotenv import load_dotenv

from azure.identity import ClientSecretCredential
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
Expand All @@ -20,6 +21,9 @@
from process_monitor import ProcessMonitor
from app_insights_metric import AzureMonitorMetric

ENV_FILE = os.environ.get('ENV_FILE')
load_dotenv(dotenv_path=ENV_FILE, override=True)


def parse_starting_position(args):
starting_position = None
Expand All @@ -37,26 +41,26 @@ def parse_starting_position(args):


parser = argparse.ArgumentParser()
parser.add_argument("--link_credit", default=3000, type=int)
parser.add_argument("--output_interval", type=float, default=1000)
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
parser.add_argument("--consumer_group", help="Consumer group name", default="$default")
parser.add_argument("--link_credit", default=int(os.environ.get("LINK_CREDIT", 3000)), type=int)
parser.add_argument("--output_interval", type=float, default=int(os.environ.get("OUTPUT_INTERVAL", 5000)))
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=int(os.environ.get("DURATION", 999999999)))
parser.add_argument("--consumer_group", help="Consumer group name", default=os.environ.get("CONSUMER_GROUP", "$default"))
parser.add_argument("--auth_timeout", help="Authorization Timeout", type=float, default=60)
parser.add_argument("--starting_offset", help="Starting offset", type=str)
parser.add_argument("--starting_offset", help="Starting offset", type=str, default=os.environ.get("STARTING_OFFSET", "-1"))
parser.add_argument("--starting_sequence_number", help="Starting sequence number", type=int)
parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss")
parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0)
parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int)
parser.add_argument("--max_batch_size", type=int, default=0,
parser.add_argument("--max_batch_size", type=int, default=int(os.environ.get("MAX_BATCH_SIZE", 0)),
help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()")
parser.add_argument("--max_wait_time", type=float, default=0,
help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()")

parser.add_argument("--track_last_enqueued_event_properties", action="store_true")
parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10)
parser.add_argument("--conn_str", help="EventHub connection string",
default=os.environ.get('EVENT_HUB_PERF_32_CONN_STR'))
parser.add_argument("--eventhub", help="Name of EventHub")
default=os.environ.get('EVENT_HUB_CONN_STR'))
parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
parser.add_argument("--address", help="Address URI to the EventHub entity")
parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with")
parser.add_argument("--sas-key", help="Shared access key")
Expand All @@ -82,7 +86,14 @@ def parse_starting_position(args):

args = parser.parse_args()
starting_position = parse_starting_position(args)
LOGGER = get_logger(args.log_filename, "stress_receive_sync", level=logging.INFO, print_console=args.print_console)
print_console = args.print_console or (os.environ.get("PRINT_CONSOLE") == "1")

LOGGER = get_logger(
args.log_filename,
"stress_receive_sync",
level=logging.INFO,
print_console=print_console
)
LOG_PER_COUNT = args.output_interval

start_time = time.perf_counter()
Expand All @@ -105,7 +116,6 @@ def on_event_received(process_monitor, partition_context, event):
recv_cnt_map[partition_context.partition_id] += 1 if event else 0
if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0:
total_time_elapsed = time.perf_counter() - start_time

partition_previous_time = recv_time_map.get(partition_context.partition_id)
partition_current_time = time.perf_counter()
recv_time_map[partition_context.partition_id] = partition_current_time
Expand Down Expand Up @@ -213,7 +223,11 @@ def create_client(args):

def run(args):

with ProcessMonitor("monitor_{}".format(args.log_filename), "consumer_stress_sync", print_console=args.print_console) as process_monitor:
with ProcessMonitor(
"monitor_{}".format(args.log_filename),
"consumer_stress_sync",
print_console=print_console
) as process_monitor:
kwargs_dict = {
"prefetch": args.link_credit,
"partition_id": str(args.recv_partition_id) if args.recv_partition_id else None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import asyncio
from argparse import ArgumentParser
from dotenv import load_dotenv

from azure.eventhub import EventHubProducerClient, EventData, EventHubSharedKeyCredential, TransportType
from azure.eventhub.exceptions import EventHubError
Expand All @@ -20,6 +21,8 @@
from process_monitor import ProcessMonitor
from app_insights_metric import AzureMonitorMetric

ENV_FILE = os.environ.get('ENV_FILE')


def handle_exception(error, ignore_send_failure, stress_logger, azure_monitor_metric):
err_msg = "Sync send failed due to error: {}".format(repr(error))
Expand Down Expand Up @@ -89,9 +92,9 @@ async def stress_send_list_async(producer: EventHubProducerClientAsync, args, st
class StressTestRunner(object):
def __init__(self, argument_parser):
self.argument_parser = argument_parser
self.argument_parser.add_argument("-m", "--method", required=True)
self.argument_parser.add_argument("--output_interval", type=float, default=1000)
self.argument_parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
self.argument_parser.add_argument("-m", "--method", default="stress_send_list_sync")
self.argument_parser.add_argument("--output_interval", type=float, default=int(os.environ.get("OUTPUT_INTERVAL", 5000)))
self.argument_parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=int(os.environ.get("DURATION", 999999999)))
self.argument_parser.add_argument(
"--partitions",
help="Number of partitions. 0 means to get partitions from eventhubs",
Expand All @@ -109,9 +112,9 @@ def __init__(self, argument_parser):
type=str
)
self.argument_parser.add_argument("--conn_str", help="EventHub connection string",
default=os.environ.get('EVENT_HUB_PERF_32_CONN_STR'))
default=os.environ.get('EVENT_HUB_CONN_STR'))
parser.add_argument("--auth_timeout", help="Authorization Timeout", type=float, default=60)
self.argument_parser.add_argument("--eventhub", help="Name of EventHub")
self.argument_parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
self.argument_parser.add_argument(
"--transport_type",
help="Transport type, 0 means AMQP, 1 means AMQP over WebSocket",
Expand Down Expand Up @@ -412,6 +415,7 @@ async def run_test_method_parallel_async(self, test_method, worker, logger, proc


if __name__ == '__main__':
load_dotenv(dotenv_path=ENV_FILE, override=True)
parser = ArgumentParser()
runner = StressTestRunner(parser)
runner.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
psutil
azure-eventhub
azure-eventhub-checkpointstoreblob
azure-eventhub-checkpointstoreblob-aio
azure-servicebus==0.50.3
azure-storage-blob
azure-identity
opencensus-ext-azure
python-dotenv
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import sys
import logging
from logging.handlers import RotatingFileHandler
Expand Down
31 changes: 31 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/templates/network_loss.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
apiVersion: chaos-mesh.org/v1alpha1
kind: NetworkChaos
metadata:
name: '{{ .Release.Name }}-{{ .Release.Revision }}'
namespace: {{ .Release.Namespace }}
annotations:
experiment.chaos-mesh.org/pause: 'true'
labels:
scenario: 'stress'
release: '{{ .Release.Name }}'
revision: '{{ .Release.Revision }}'
spec:
scheduler:
cron: '@every 30s'
duration: '10s'
action: loss
direction: to
externalTargets:
- 'eh-stress-{{ .Release.Name }}-{{ .Release.Revision }}.servicebus.windows.net'
mode: one
selector:
labelSelectors:
testInstance: "eventhub-{{ .Release.Name }}-{{ .Release.Revision }}"
chaos: 'true'
namespaces:
- {{ .Release.Namespace }}
podPhaseSelectors:
- 'Running'
loss:
loss: '100'
correlation: '100'
16 changes: 16 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/templates/testjob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{{- include "stress-test-addons.deploy-job-template.from-pod" (list . "stress.python-eh-stress") -}}
{{- define "stress.python-eh-stress" -}}
metadata:
labels:
testName: "deploy-python-eh-stress"
testInstance: "eventhub-{{ .Release.Name }}-{{ .Release.Revision }}"
chaos: "true"
spec:
containers:
- name: python-eh-stress
image: {{ default "stresstestregistry.azurecr.io/python" .Values.repository }}/stress:{{ default "v2" .Values.tag }}
command: ['bash', '-c', 'python3 azure_eventhub_producer_stress.py & python3 azure_eventhub_consumer_stress_sync.py']
{{- include "stress-test-addons.container-env" . | nindent 6 }}
# async test command
# command: ['bash', '-c', 'python3 azure_eventhub_producer_stress.py -m stress_send_list_async & python3 azure_eventhub_consumer_stress_async.py']
{{- end -}}
Loading

0 comments on commit 4b3397d

Please sign in to comment.