Skip to content

Commit

Permalink
Stress testing updates (#27456)
Browse files Browse the repository at this point in the history
* can uncomment line 5 to run against git version of pyamqp

* can move line 6 into scripts/dev_requirement file

* test against newest version of pyamqp

* increase test time, get rid of unused tests

* removing test names

* change naming

* change naming

* raise logging level to catch only error level

* return logging to info

* adding resource requests

* message retention needs to last as long as the test

* websocket async test

* changing life of messages

* 32 partitions

* add uamqp flag - remove logging

* adding before trying matrix

* websocket dep

* helm ignore

* updating

* updating tests

* update consumer files

* remove log lines

* remove log lines

* remove commented

* update
  • Loading branch information
l0lawrence authored Nov 11, 2022
1 parent 3711eeb commit 0a14a3c
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 54 deletions.
8 changes: 8 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
stress
stress.exe
.env
Dockerfile
*.py
__pycache__
scripts
*.txt
2 changes: 2 additions & 0 deletions sdk/eventhub/azure-eventhub/stress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# public OSS users should simply leave this argument blank or ignore its presence entirely
ARG REGISTRY="mcr.microsoft.com/mirror/docker/library/"
FROM ${REGISTRY}python:3.8-slim-buster
# RUN apt-get -y update && apt-get -y install git
# git+https://github.com/Azure/azure-sdk-for-python.git@feature/eventhub/pyproto#subdirectory=sdk/eventhub/azure-eventhub&egg=azure-eventhub

WORKDIR /app

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ def __init__(self, test_name, test_description=None):
self.name = test_name
self.desc = test_description

events_measure_name = "The number of events handled by " + self.name
events_measure_name = "NumEvents" + self.name
events_measure_desc = "The number of events handled by" + self.desc if self.desc else None
memory_measure_name = "memory usage percentage for " + self.name
memory_measure_desc = "memory usage percentage for " + self.desc if self.desc else None
cpu_measure_name = "cpu usage percentage for " + self.name
cpu_measure_desc = "cpu usage percentage for " + self.desc if self.desc else None
error_measure_name = "error count for " + self.name
memory_measure_name = "Memory " + self.name
memory_measure_desc = "Memory usage percentage for " + self.desc if self.desc else None
cpu_measure_name = "Cpu " + self.name
cpu_measure_desc = "Cpu usage percentage for " + self.desc if self.desc else None
error_measure_name = "Errors " + self.name
error_measure_desc = "The number of errors happened while running the test for " + self.desc if self.desc else None

self.events_measure = measure_module.MeasureInt(events_measure_name, events_measure_desc, "events")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def parse_starting_position(args):
parser.add_argument("--pyamqp_logging_enable", help="pyamqp logging enable", action="store_true")
parser.add_argument("--print_console", help="print to console", action="store_true")
parser.add_argument("--log_filename", help="log file name", type=str)
parser.add_argument("--uamqp_mode", help="Flag for uamqp or pyamqp", action="store_true")

args = parser.parse_args()
starting_position = parse_starting_position(args)
Expand Down Expand Up @@ -190,7 +191,8 @@ def create_client(args):
auth_timeout=args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=args.uamqp_logging_enable
logging_enable=args.pyamqp_logging_enable,
uamqp_transport=args.uamqp_mode,
)
elif args.conn_str:
client = EventHubConsumerClientTest.from_connection_string(
Expand All @@ -202,7 +204,8 @@ def create_client(args):
auth_timeout=args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=args.pyamqp_logging_enable
logging_enable=args.pyamqp_logging_enable,
uamqp_transport=args.uamqp_mode,
)
elif args.hostname:
client = EventHubConsumerClientTest(
Expand All @@ -215,7 +218,8 @@ def create_client(args):
auth_timeout=args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=args.pyamqp_logging_enable
logging_enable=args.pyamqp_logging_enable,
uamqp_transport=args.uamqp_mode,
)
elif args.aad_client_id:
credential = ClientSecretCredential(args.tenant_id, args.aad_client_id, args.aad_secret)
Expand All @@ -229,7 +233,8 @@ def create_client(args):
auth_timeout=args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=args.pyamqp_logging_enable
logging_enable=args.pyamqp_logging_enable,
uamqp_transport=args.uamqp_mode,
)

return client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def parse_starting_position(args):
parser.add_argument("--pyamqp_logging_enable", help="pyamqp logging enable", action="store_true")
parser.add_argument("--print_console", help="print to console", action="store_true")
parser.add_argument("--log_filename", help="log file name", type=str)
parser.add_argument("--uamqp_mode", help="Flag for uamqp or pyamqp", action="store_true")

args = parser.parse_args()
starting_position = parse_starting_position(args)
Expand Down Expand Up @@ -191,7 +192,8 @@ def create_client(args):
auth_timeout=args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=args.uamqp_logging_enable
logging_enable=args.pyamqp_logging_enable,
uamqp_transport=args.uamqp_mode,
)
elif args.conn_str:
client = EventHubConsumerClientTest.from_connection_string(
Expand All @@ -203,7 +205,8 @@ def create_client(args):
auth_timeout=args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=args.pyamqp_logging_enable
logging_enable=args.pyamqp_logging_enable,
uamqp_transport=args.uamqp_mode,
)
elif args.hostname:
client = EventHubConsumerClientTest(
Expand All @@ -216,7 +219,8 @@ def create_client(args):
auth_timeout=args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=args.pyamqp_logging_enable
logging_enable=args.pyamqp_logging_enable,
uamqp_transport=args.uamqp_mode,
)
elif args.aad_client_id:
credential = ClientSecretCredential(args.tenant_id, args.aad_client_id, args.aad_secret)
Expand All @@ -230,7 +234,8 @@ def create_client(args):
auth_timeout=args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=args.pyamqp_logging_enable
logging_enable=args.pyamqp_logging_enable,
uamqp_transport=args.uamqp_mode,
)

return client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,24 @@ def handle_exception(error, ignore_send_failure, stress_logger, azure_monitor_me
return 0
raise error

def on_success(events, pid):
# sending succeeded
pass


def on_error(events, pid, error):
# sending failed
pass

async def on_success_async(events, pid):
# sending succeeded
pass


async def on_error_async(events, pid, error):
# sending failed
pass


def stress_send_sync(producer: EventHubProducerClient, args, stress_logger, azure_monitor_metric):
try:
Expand Down Expand Up @@ -129,6 +147,7 @@ def __init__(self, argument_parser):
action="store_true",
help="Whether create new client for each sending",
)
self.argument_parser.add_argument("--buffered_mode", help="buffer producer", action="store_true")
self.argument_parser.add_argument("--proxy_hostname", type=str)
self.argument_parser.add_argument("--proxy_port", type=str)
self.argument_parser.add_argument("--proxy_username", type=str)
Expand All @@ -147,6 +166,7 @@ def __init__(self, argument_parser):
self.argument_parser.add_argument("--retry_backoff_factor", type=float, default=0.8)
self.argument_parser.add_argument("--retry_backoff_max", type=float, default=120)
self.argument_parser.add_argument("--ignore_send_failure", help="ignore sending failures", action="store_true")
self.argument_parser.add_argument("--uamqp_mode", help="Flag for uamqp or pyamqp", action="store_true")
self.args, _ = parser.parse_known_args()

if self.args.send_partition_key and self.args.send_partition_id:
Expand All @@ -163,15 +183,44 @@ def create_client(self, client_class, is_async=False):
"retry_backoff_factor": self.args.retry_backoff_factor,
"retry_backoff_max": self.args.retry_backoff_max
}

if self.args.proxy_hostname:
http_proxy = {
"proxy_hostname": self.args.proxy_hostname,
"proxy_port": self.args.proxy_port,
"username": self.args.proxy_username,
"password": self.args.proxy_password,
}

if self.args.azure_identity:
if self.args.buffered_mode:
if is_async:
client = client_class.from_connection_string(
self.args.conn_str,
eventhub_name=self.args.eventhub,
auth_timeout=self.args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=self.args.pyamqp_logging_enable,
buffered_mode=self.args.buffered_mode,
on_success=on_success_async,
on_error=on_error_async,
uamqp_transport=self.args.uamqp_mode,
**retry_options
)
else:
client = client_class.from_connection_string(
self.args.conn_str,
eventhub_name=self.args.eventhub,
auth_timeout=self.args.auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=self.args.pyamqp_logging_enable,
buffered_mode=self.args.buffered_mode,
on_success=on_success,
on_error=on_error,
uamqp_transport=self.args.uamqp_mode,
**retry_options
)
elif self.args.azure_identity:
print("Using Azure Identity")
client = client_class(
fully_qualified_namespace=self.args.hostname,
Expand All @@ -181,6 +230,7 @@ def create_client(self, client_class, is_async=False):
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=self.args.pyamqp_logging_enable,
uamqp_transport=self.args.uamqp_mode,
**retry_options
)
elif self.args.conn_str:
Expand All @@ -191,6 +241,7 @@ def create_client(self, client_class, is_async=False):
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=self.args.pyamqp_logging_enable,
uamqp_transport=self.args.uamqp_mode,
**retry_options
)
elif self.args.hostname:
Expand All @@ -202,6 +253,7 @@ def create_client(self, client_class, is_async=False):
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=self.args.pyamqp_logging_enable,
uamqp_transport=self.args.uamqp_mode,
**retry_options
)
elif self.args.aad_client_id:
Expand All @@ -217,6 +269,7 @@ def create_client(self, client_class, is_async=False):
http_proxy=http_proxy,
transport_type=transport_type,
logging_enable=self.args.pyamqp_logging_enable,
uamqp_transport=self.args.uamqp_mode,
**retry_options
)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
psutil
azure-eventhub==5.8.0a5
azure-eventhub
azure-eventhub-checkpointstoreblob
azure-eventhub-checkpointstoreblob-aio
azure-servicebus==0.50.3
azure-servicebus==7.8.1
azure-storage-blob
azure-identity
opencensus-ext-azure
python-dotenv
websocket-client
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ resource eventHubsNamespace_eventHubName 'Microsoft.EventHub/namespaces/eventhub
name: '${eventHubsNamespace_var}/${eventHubName}'
location: location
properties: {
messageRetentionInDays: 1
messageRetentionInDays: 5
partitionCount: 32
}
dependsOn: [
Expand Down Expand Up @@ -104,4 +104,4 @@ output EVENT_HUB_SAS_POLICY string = eventHubAuthRuleName
output EVENT_HUB_SAS_KEY string = listkeys(eventHubAuthRuleName, ehVersion).primaryKey
output AZURE_STORAGE_CONN_STR string = 'DefaultEndpointsProtocol=https;AccountName=${storageAccount_var};AccountKey=${listKeys(storageAccountId, providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value};EndpointSuffix=${storageEndpointSuffix}'
output AZURE_STORAGE_ACCOUNT string = storageAccount_var
output AZURE_STORAGE_ACCESS_KEY string = listKeys(storageAccountId, providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value
output AZURE_STORAGE_ACCESS_KEY string = listKeys(storageAccountId, providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value
55 changes: 32 additions & 23 deletions sdk/eventhub/azure-eventhub/stress/templates/testjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,64 @@ metadata:
labels:
testName: "deploy-python-eh-stress"
testInstance: "eventhub-{{ .Release.Name }}-{{ .Release.Revision }}"
chaos: "true"
spec:
nodeSelector:
sku: 'd4v4'
containers:
- name: python-eh-stress
image: {{ .Values.image }}
imagePullPolicy: Always
resources:
limits:
memory: "2000Mi"
cpu: "1"

{{ if eq .Stress.Scenario "identity" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --azure_identity True --pyamqp_logging_enable --print_console --duration 7200']
{{ if eq .Stress.Scenario "event-async" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_async --duration 259200 & python azure_eventhub_consumer_stress_async.py --duration 259200 ']
{{- end -}}

{{ if eq .Stress.Scenario "sendsync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 7200']
{{ if eq .Stress.Scenario "event-sync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 259200 & python azure_eventhub_consumer_stress_sync.py --duration 259200 ']
{{- end -}}

{{ if eq .Stress.Scenario "sendlistsync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 7200']
{{ if eq .Stress.Scenario "batch-async" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_async --duration 259200 & python azure_eventhub_consumer_stress_async.py --duration 259200 ']
{{- end -}}

{{ if eq .Stress.Scenario "sendasync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 7200']
{{ if eq .Stress.Scenario "batch-sync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 259200 & python azure_eventhub_consumer_stress_sync.py --duration 259200 ']
{{- end -}}

{{ if eq .Stress.Scenario "sendlistasync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 7200']
{{ if eq .Stress.Scenario "bufferedproducerlistsync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 259200 --buffered_mode & python azure_eventhub_consumer_stress_sync.py --duration 259200 ']
{{- end -}}

{{ if eq .Stress.Scenario "sendconsumeasync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_async --duration 7200 & python azure_eventhub_consumer_stress_async.py --duration 7200']
{{ if eq .Stress.Scenario "bufferedproducerasync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_async --duration 259200 --buffered_mode & python azure_eventhub_consumer_stress_async.py --duration 259200']
{{- end -}}

{{ if eq .Stress.Scenario "sendconsumesync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 7200 & python azure_eventhub_consumer_stress_sync.py --duration 7200']
{{ if eq .Stress.Scenario "bufferedproducerlistasync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_async --duration 259200 --buffered_mode & python azure_eventhub_consumer_stress_async.py --duration 259200 ']
{{- end -}}

{{ if eq .Stress.Scenario "sendlistconsumeasync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_async --duration 7200 & python azure_eventhub_consumer_stress_async.py --duration 7200']
{{ if eq .Stress.Scenario "bufferedproducersync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 259200 --buffered_mode & python azure_eventhub_consumer_stress_sync.py --duration 259200']
{{- end -}}

{{ if eq .Stress.Scenario "sendlistconsumesync" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 7200 & python azure_eventhub_consumer_stress_sync.py --duration 7200']
{{ if eq .Stress.Scenario "syncwebsockets" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 259200 --transport_type 1 & python azure_eventhub_consumer_stress_sync.py --duration 259200 --transport_type 1']
{{- end -}}

{{ if eq .Stress.Scenario "consumeasyncidentity" }}
command: ['bash', '-c', 'python azure_eventhub_consumer_stress_async.py --azure_identity True --duration 7200']
{{ if eq .Stress.Scenario "asyncwebsockets" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_async --duration 259200 --transport_type 1 & python azure_eventhub_consumer_stress_async.py --duration 259200 --transport_type 1']
{{- end -}}

{{ if eq .Stress.Scenario "consumesyncidentity" }}
command: ['bash', '-c', 'python azure_eventhub_consumer_stress_sync.py --azure_identity True --duration 7200']
{{ if eq .Stress.Scenario "sync-batch-web" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 259200 --transport_type 1 & python azure_eventhub_consumer_stress_sync.py --duration 259200 --transport_type 1']
{{- end -}}

{{ if eq .Stress.Scenario "async-batch-web" }}
command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_async --duration 259200 --transport_type 1 & python azure_eventhub_consumer_stress_async.py --duration 259200 --transport_type 1']
{{- end -}}

{{- include "stress-test-addons.container-env" . | nindent 6 }}
Expand Down
23 changes: 12 additions & 11 deletions sdk/eventhub/azure-eventhub/stress/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
# one for each scenario in the list. The pod spec can then be configured to pass the
# scenario name down to the test command, e.g. `command: ["node", "{{ .Scenario }}.js"]`
scenarios:
- "identity"
- "sendsync"
- "sendlistsync"
- "sendasync"
- "sendlistasync"
- "sendconsumeasync"
- "sendconsumesync"
- "sendlistconsumeasync"
- "sendlistconsumesync"
- "consumeasyncidentity"
- "consumesyncidentity"
- "event-async"
- "event-sync"
- "batch-async"
- "batch-sync"
- "bufferedproducerlistsync"
- "bufferedproducerasync"
- "bufferedproducerlistasync"
- "bufferedproducersync"
- "syncwebsockets"
- "asyncwebsockets"
- "sync-batch-web"
- "async-batch-web"

0 comments on commit 0a14a3c

Please sign in to comment.