From 0a14a3cd7baba19bed806a641032352ce4f9d313 Mon Sep 17 00:00:00 2001 From: Libba Lawrence Date: Fri, 11 Nov 2022 14:48:53 -0800 Subject: [PATCH] Stress testing updates (#27456) * can uncomment line 5 to run against git version of pyamqp * can move line 6 into scripts/dev_requirement file * test against newest version of pyamqp * increase test time, get rid of unused tests * removing test names * change naming * change naming * raise logging level to catch only error level * return logging to info * adding resource requests * message retention needs to last as long as the test * websocket async test * changing life of messages * 32 partitions * add uamqp flag - remove logging * adding before trying matrix * websocket dep * helm ignore * updating * updating tests * update consumer files * remove log lines * remove log lines * remove commented * update --- .../azure-eventhub/stress/.helmignore | 8 +++ sdk/eventhub/azure-eventhub/stress/Dockerfile | 2 + .../stress/scripts/app_insights_metric.py | 12 ++-- .../azure_eventhub_consumer_stress_async.py | 13 +++-- .../azure_eventhub_consumer_stress_sync.py | 13 +++-- .../scripts/azure_eventhub_producer_stress.py | 57 ++++++++++++++++++- .../stress/scripts/dev_requirement.txt | 5 +- .../stress/stress-test-resources.bicep | 4 +- .../stress/templates/testjob.yaml | 55 ++++++++++-------- .../azure-eventhub/stress/values.yaml | 23 ++++---- 10 files changed, 138 insertions(+), 54 deletions(-) create mode 100644 sdk/eventhub/azure-eventhub/stress/.helmignore diff --git a/sdk/eventhub/azure-eventhub/stress/.helmignore b/sdk/eventhub/azure-eventhub/stress/.helmignore new file mode 100644 index 000000000000..8d4df59d826b --- /dev/null +++ b/sdk/eventhub/azure-eventhub/stress/.helmignore @@ -0,0 +1,8 @@ +stress +stress.exe +.env +Dockerfile +*.py +__pycache__ +scripts +*.txt \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhub/stress/Dockerfile b/sdk/eventhub/azure-eventhub/stress/Dockerfile index 033b01c3fa0c..88f7099b6ceb 100644 --- a/sdk/eventhub/azure-eventhub/stress/Dockerfile +++ b/sdk/eventhub/azure-eventhub/stress/Dockerfile @@ -2,6 +2,8 @@ # public OSS users should simply leave this argument blank or ignore its presence entirely ARG REGISTRY="mcr.microsoft.com/mirror/docker/library/" FROM ${REGISTRY}python:3.8-slim-buster +# RUN apt-get -y update && apt-get -y install git +# git+https://github.com/Azure/azure-sdk-for-python.git@feature/eventhub/pyproto#subdirectory=sdk/eventhub/azure-eventhub&egg=azure-eventhub WORKDIR /app diff --git a/sdk/eventhub/azure-eventhub/stress/scripts/app_insights_metric.py b/sdk/eventhub/azure-eventhub/stress/scripts/app_insights_metric.py index 9f814fdf2845..245f2bfd4735 100644 --- a/sdk/eventhub/azure-eventhub/stress/scripts/app_insights_metric.py +++ b/sdk/eventhub/azure-eventhub/stress/scripts/app_insights_metric.py @@ -22,13 +22,13 @@ def __init__(self, test_name, test_description=None): self.name = test_name self.desc = test_description - events_measure_name = "The number of events handled by " + self.name + events_measure_name = "NumEvents" + self.name events_measure_desc = "The number of events handled by" + self.desc if self.desc else None - memory_measure_name = "memory usage percentage for " + self.name - memory_measure_desc = "memory usage percentage for " + self.desc if self.desc else None - cpu_measure_name = "cpu usage percentage for " + self.name - cpu_measure_desc = "cpu usage percentage for " + self.desc if self.desc else None - error_measure_name = "error count for " + self.name + memory_measure_name = "Memory " + self.name + memory_measure_desc = "Memory usage percentage for " + self.desc if self.desc else None + cpu_measure_name = "Cpu " + self.name + cpu_measure_desc = "Cpu usage percentage for " + self.desc if self.desc else None + error_measure_name = "Errors " + self.name error_measure_desc = "The number of errors happened while running the test for " + self.desc if self.desc else None self.events_measure = measure_module.MeasureInt(events_measure_name, events_measure_desc, "events") diff --git a/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_async.py b/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_async.py index 99a651645ddd..a8f8fa3ddca6 100644 --- a/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_async.py +++ b/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_async.py @@ -88,6 +88,7 @@ def parse_starting_position(args): parser.add_argument("--pyamqp_logging_enable", help="pyamqp logging enable", action="store_true") parser.add_argument("--print_console", help="print to console", action="store_true") parser.add_argument("--log_filename", help="log file name", type=str) +parser.add_argument("--uamqp_mode", help="Flag for uamqp or pyamqp", action="store_true") args = parser.parse_args() starting_position = parse_starting_position(args) @@ -190,7 +191,8 @@ def create_client(args): auth_timeout=args.auth_timeout, http_proxy=http_proxy, transport_type=transport_type, - logging_enable=args.uamqp_logging_enable + logging_enable=args.pyamqp_logging_enable, + uamqp_transport=args.uamqp_mode, ) elif args.conn_str: client = EventHubConsumerClientTest.from_connection_string( @@ -202,7 +204,8 @@ def create_client(args): auth_timeout=args.auth_timeout, http_proxy=http_proxy, transport_type=transport_type, - logging_enable=args.pyamqp_logging_enable + logging_enable=args.pyamqp_logging_enable, + uamqp_transport=args.uamqp_mode, ) elif args.hostname: client = EventHubConsumerClientTest( @@ -215,7 +218,8 @@ def create_client(args): auth_timeout=args.auth_timeout, http_proxy=http_proxy, transport_type=transport_type, - logging_enable=args.pyamqp_logging_enable + logging_enable=args.pyamqp_logging_enable, + uamqp_transport=args.uamqp_mode, ) elif args.aad_client_id: credential = ClientSecretCredential(args.tenant_id, args.aad_client_id, args.aad_secret) @@ -229,7 +233,8 @@ def create_client(args): auth_timeout=args.auth_timeout, http_proxy=http_proxy, transport_type=transport_type, - logging_enable=args.pyamqp_logging_enable + logging_enable=args.pyamqp_logging_enable, + uamqp_transport=args.uamqp_mode, ) return client diff --git a/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_sync.py b/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_sync.py index 17f9697b7304..f77d56b6b689 100644 --- a/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_sync.py +++ b/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_consumer_stress_sync.py @@ -86,6 +86,7 @@ def parse_starting_position(args): parser.add_argument("--pyamqp_logging_enable", help="pyamqp logging enable", action="store_true") parser.add_argument("--print_console", help="print to console", action="store_true") parser.add_argument("--log_filename", help="log file name", type=str) +parser.add_argument("--uamqp_mode", help="Flag for uamqp or pyamqp", action="store_true") args = parser.parse_args() starting_position = parse_starting_position(args) @@ -191,7 +192,8 @@ def create_client(args): auth_timeout=args.auth_timeout, http_proxy=http_proxy, transport_type=transport_type, - logging_enable=args.uamqp_logging_enable + logging_enable=args.pyamqp_logging_enable, + uamqp_transport=args.uamqp_mode, ) elif args.conn_str: client = EventHubConsumerClientTest.from_connection_string( @@ -203,7 +205,8 @@ def create_client(args): auth_timeout=args.auth_timeout, http_proxy=http_proxy, transport_type=transport_type, - logging_enable=args.pyamqp_logging_enable + logging_enable=args.pyamqp_logging_enable, + uamqp_transport=args.uamqp_mode, ) elif args.hostname: client = EventHubConsumerClientTest( @@ -216,7 +219,8 @@ def create_client(args): auth_timeout=args.auth_timeout, http_proxy=http_proxy, transport_type=transport_type, - logging_enable=args.pyamqp_logging_enable + logging_enable=args.pyamqp_logging_enable, + uamqp_transport=args.uamqp_mode, ) elif args.aad_client_id: credential = ClientSecretCredential(args.tenant_id, args.aad_client_id, args.aad_secret) @@ -230,7 +234,8 @@ def create_client(args): auth_timeout=args.auth_timeout, http_proxy=http_proxy, transport_type=transport_type, - logging_enable=args.pyamqp_logging_enable + logging_enable=args.pyamqp_logging_enable, + uamqp_transport=args.uamqp_mode, ) return client diff --git a/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_producer_stress.py b/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_producer_stress.py index beae73545fe4..a89c9d8291f3 100644 --- a/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_producer_stress.py +++ b/sdk/eventhub/azure-eventhub/stress/scripts/azure_eventhub_producer_stress.py @@ -32,6 +32,24 @@ def handle_exception(error, ignore_send_failure, stress_logger, azure_monitor_me return 0 raise error +def on_success(events, pid): + # sending succeeded + pass + + +def on_error(events, pid, error): + # sending failed + pass + +async def on_success_async(events, pid): + # sending succeeded + pass + + +async def on_error_async(events, pid, error): + # sending failed + pass + def stress_send_sync(producer: EventHubProducerClient, args, stress_logger, azure_monitor_metric): try: @@ -129,6 +147,7 @@ def __init__(self, argument_parser): action="store_true", help="Whether create new client for each sending", ) + self.argument_parser.add_argument("--buffered_mode", help="buffer producer", action="store_true") self.argument_parser.add_argument("--proxy_hostname", type=str) self.argument_parser.add_argument("--proxy_port", type=str) self.argument_parser.add_argument("--proxy_username", type=str) @@ -147,6 +166,7 @@ def __init__(self, argument_parser): self.argument_parser.add_argument("--retry_backoff_factor", type=float, default=0.8) self.argument_parser.add_argument("--retry_backoff_max", type=float, default=120) self.argument_parser.add_argument("--ignore_send_failure", help="ignore sending failures", action="store_true") + self.argument_parser.add_argument("--uamqp_mode", help="Flag for uamqp or pyamqp", action="store_true") self.args, _ = parser.parse_known_args() if self.args.send_partition_key and self.args.send_partition_id: @@ -163,6 +183,7 @@ def create_client(self, client_class, is_async=False): "retry_backoff_factor": self.args.retry_backoff_factor, "retry_backoff_max": self.args.retry_backoff_max } + if self.args.proxy_hostname: http_proxy = { "proxy_hostname": self.args.proxy_hostname, @@ -170,8 +191,36 @@ def create_client(self, client_class, is_async=False): "username": self.args.proxy_username, "password": self.args.proxy_password, } - - if self.args.azure_identity: + if self.args.buffered_mode: + if is_async: + client = client_class.from_connection_string( + self.args.conn_str, + eventhub_name=self.args.eventhub, + auth_timeout=self.args.auth_timeout, + http_proxy=http_proxy, + transport_type=transport_type, + logging_enable=self.args.pyamqp_logging_enable, + buffered_mode=self.args.buffered_mode, + on_success=on_success_async, + on_error=on_error_async, + uamqp_transport=self.args.uamqp_mode, + **retry_options + ) + else: + client = client_class.from_connection_string( + self.args.conn_str, + eventhub_name=self.args.eventhub, + auth_timeout=self.args.auth_timeout, + http_proxy=http_proxy, + transport_type=transport_type, + logging_enable=self.args.pyamqp_logging_enable, + buffered_mode=self.args.buffered_mode, + on_success=on_success, + on_error=on_error, + uamqp_transport=self.args.uamqp_mode, + **retry_options + ) + elif self.args.azure_identity: print("Using Azure Identity") client = client_class( fully_qualified_namespace=self.args.hostname, @@ -181,6 +230,7 @@ def create_client(self, client_class, is_async=False): http_proxy=http_proxy, transport_type=transport_type, logging_enable=self.args.pyamqp_logging_enable, + uamqp_transport=self.args.uamqp_mode, **retry_options ) elif self.args.conn_str: @@ -191,6 +241,7 @@ def create_client(self, client_class, is_async=False): http_proxy=http_proxy, transport_type=transport_type, logging_enable=self.args.pyamqp_logging_enable, + uamqp_transport=self.args.uamqp_mode, **retry_options ) elif self.args.hostname: @@ -202,6 +253,7 @@ def create_client(self, client_class, is_async=False): http_proxy=http_proxy, transport_type=transport_type, logging_enable=self.args.pyamqp_logging_enable, + uamqp_transport=self.args.uamqp_mode, **retry_options ) elif self.args.aad_client_id: @@ -217,6 +269,7 @@ def create_client(self, client_class, is_async=False): http_proxy=http_proxy, transport_type=transport_type, logging_enable=self.args.pyamqp_logging_enable, + uamqp_transport=self.args.uamqp_mode, **retry_options ) else: diff --git a/sdk/eventhub/azure-eventhub/stress/scripts/dev_requirement.txt b/sdk/eventhub/azure-eventhub/stress/scripts/dev_requirement.txt index 70e626492a26..b7ff5a1bb209 100644 --- a/sdk/eventhub/azure-eventhub/stress/scripts/dev_requirement.txt +++ b/sdk/eventhub/azure-eventhub/stress/scripts/dev_requirement.txt @@ -1,9 +1,10 @@ psutil -azure-eventhub==5.8.0a5 +azure-eventhub azure-eventhub-checkpointstoreblob azure-eventhub-checkpointstoreblob-aio -azure-servicebus==0.50.3 +azure-servicebus==7.8.1 azure-storage-blob azure-identity opencensus-ext-azure python-dotenv +websocket-client diff --git a/sdk/eventhub/azure-eventhub/stress/stress-test-resources.bicep b/sdk/eventhub/azure-eventhub/stress/stress-test-resources.bicep index 856f6352e2d2..3ff69f8cb492 100644 --- a/sdk/eventhub/azure-eventhub/stress/stress-test-resources.bicep +++ b/sdk/eventhub/azure-eventhub/stress/stress-test-resources.bicep @@ -36,7 +36,7 @@ resource eventHubsNamespace_eventHubName 'Microsoft.EventHub/namespaces/eventhub name: '${eventHubsNamespace_var}/${eventHubName}' location: location properties: { - messageRetentionInDays: 1 + messageRetentionInDays: 5 partitionCount: 32 } dependsOn: [ @@ -104,4 +104,4 @@ output EVENT_HUB_SAS_POLICY string = eventHubAuthRuleName output EVENT_HUB_SAS_KEY string = listkeys(eventHubAuthRuleName, ehVersion).primaryKey output AZURE_STORAGE_CONN_STR string = 'DefaultEndpointsProtocol=https;AccountName=${storageAccount_var};AccountKey=${listKeys(storageAccountId, providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value};EndpointSuffix=${storageEndpointSuffix}' output AZURE_STORAGE_ACCOUNT string = storageAccount_var -output AZURE_STORAGE_ACCESS_KEY string = listKeys(storageAccountId, providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value \ No newline at end of file +output AZURE_STORAGE_ACCESS_KEY string = listKeys(storageAccountId, providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value diff --git a/sdk/eventhub/azure-eventhub/stress/templates/testjob.yaml b/sdk/eventhub/azure-eventhub/stress/templates/testjob.yaml index 410d8722e565..0563f7422ee8 100644 --- a/sdk/eventhub/azure-eventhub/stress/templates/testjob.yaml +++ b/sdk/eventhub/azure-eventhub/stress/templates/testjob.yaml @@ -4,55 +4,64 @@ metadata: labels: testName: "deploy-python-eh-stress" testInstance: "eventhub-{{ .Release.Name }}-{{ .Release.Revision }}" - chaos: "true" spec: + nodeSelector: + sku: 'd4v4' containers: - name: python-eh-stress image: {{ .Values.image }} imagePullPolicy: Always + resources: + limits: + memory: "2000Mi" + cpu: "1" - {{ if eq .Stress.Scenario "identity" }} - command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --azure_identity True --pyamqp_logging_enable --print_console --duration 7200'] + {{ if eq .Stress.Scenario "event-async" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_async --duration 259200 & python azure_eventhub_consumer_stress_async.py --duration 259200 '] {{- end -}} - {{ if eq .Stress.Scenario "sendsync" }} - command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 7200'] + {{ if eq .Stress.Scenario "event-sync" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 259200 & python azure_eventhub_consumer_stress_sync.py --duration 259200 '] {{- end -}} - {{ if eq .Stress.Scenario "sendlistsync" }} - command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 7200'] + {{ if eq .Stress.Scenario "batch-async" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_async --duration 259200 & python azure_eventhub_consumer_stress_async.py --duration 259200 '] {{- end -}} - {{ if eq .Stress.Scenario "sendasync" }} - command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 7200'] + {{ if eq .Stress.Scenario "batch-sync" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 259200 & python azure_eventhub_consumer_stress_sync.py --duration 259200 '] {{- end -}} - {{ if eq .Stress.Scenario "sendlistasync" }} - command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 7200'] + {{ if eq .Stress.Scenario "bufferedproducerlistsync" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 259200 --buffered_mode & python azure_eventhub_consumer_stress_sync.py --duration 259200 '] {{- end -}} - {{ if eq .Stress.Scenario "sendconsumeasync" }} - command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_async --duration 7200 & python azure_eventhub_consumer_stress_async.py --duration 7200'] + {{ if eq .Stress.Scenario "bufferedproducerasync" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_async --duration 259200 --buffered_mode & python azure_eventhub_consumer_stress_async.py --duration 259200'] {{- end -}} - {{ if eq .Stress.Scenario "sendconsumesync" }} - command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 7200 & python azure_eventhub_consumer_stress_sync.py --duration 7200'] + {{ if eq .Stress.Scenario "bufferedproducerlistasync" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_async --duration 259200 --buffered_mode & python azure_eventhub_consumer_stress_async.py --duration 259200 '] {{- end -}} - {{ if eq .Stress.Scenario "sendlistconsumeasync" }} - command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_async --duration 7200 & python azure_eventhub_consumer_stress_async.py --duration 7200'] + {{ if eq .Stress.Scenario "bufferedproducersync" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 259200 --buffered_mode & python azure_eventhub_consumer_stress_sync.py --duration 259200'] {{- end -}} - {{ if eq .Stress.Scenario "sendlistconsumesync" }} - command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 7200 & python azure_eventhub_consumer_stress_sync.py --duration 7200'] + {{ if eq .Stress.Scenario "syncwebsockets" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_sync --duration 259200 --transport_type 1 & python azure_eventhub_consumer_stress_sync.py --duration 259200 --transport_type 1'] {{- end -}} - {{ if eq .Stress.Scenario "consumeasyncidentity" }} - command: ['bash', '-c', 'python azure_eventhub_consumer_stress_async.py --azure_identity True --duration 7200'] + {{ if eq .Stress.Scenario "asyncwebsockets" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_async --duration 259200 --transport_type 1 & python azure_eventhub_consumer_stress_async.py --duration 259200 --transport_type 1'] {{- end -}} - {{ if eq .Stress.Scenario "consumesyncidentity" }} - command: ['bash', '-c', 'python azure_eventhub_consumer_stress_sync.py --azure_identity True --duration 7200'] + {{ if eq .Stress.Scenario "sync-batch-web" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_sync --duration 259200 --transport_type 1 & python azure_eventhub_consumer_stress_sync.py --duration 259200 --transport_type 1'] + {{- end -}} + + {{ if eq .Stress.Scenario "async-batch-web" }} + command: ['bash', '-c', 'python azure_eventhub_producer_stress.py -m stress_send_list_async --duration 259200 --transport_type 1 & python azure_eventhub_consumer_stress_async.py --duration 259200 --transport_type 1'] {{- end -}} {{- include "stress-test-addons.container-env" . | nindent 6 }} diff --git a/sdk/eventhub/azure-eventhub/stress/values.yaml b/sdk/eventhub/azure-eventhub/stress/values.yaml index e106bba45539..af8bbef1115b 100644 --- a/sdk/eventhub/azure-eventhub/stress/values.yaml +++ b/sdk/eventhub/azure-eventhub/stress/values.yaml @@ -2,14 +2,15 @@ # one for each scenario in the list. The pod spec can then be configured to pass the # scenario name down to the test command, e.g. `command: ["node", "{{ .Scenario }}.js"]` scenarios: -- "identity" -- "sendsync" -- "sendlistsync" -- "sendasync" -- "sendlistasync" -- "sendconsumeasync" -- "sendconsumesync" -- "sendlistconsumeasync" -- "sendlistconsumesync" -- "consumeasyncidentity" -- "consumesyncidentity" \ No newline at end of file +- "event-async" +- "event-sync" +- "batch-async" +- "batch-sync" +- "bufferedproducerlistsync" +- "bufferedproducerasync" +- "bufferedproducerlistasync" +- "bufferedproducersync" +- "syncwebsockets" +- "asyncwebsockets" +- "sync-batch-web" +- "async-batch-web"