diff --git a/.github/workflows/microbenchmarks.yaml b/.github/workflows/microbenchmarks.yaml new file mode 100644 index 0000000000..e3a0b71ba3 --- /dev/null +++ b/.github/workflows/microbenchmarks.yaml @@ -0,0 +1,142 @@ +# Workflow which runs micro benchmarks on nightly basis +# We run it nightly instead on every PR / merge to master since some of those benchmarks take quite a while. +name: Micro Benchmarks + +on: + pull_request: + type: [opened, reopened, edited] + branches: + # Only for PRs targeting those branches + - master + - v[0-9]+.[0-9]+ + schedule: + - cron: '30 3 * * *' + +jobs: + # Special job which automatically cancels old runs for the same branch, prevents runs for the + # same file set which has already passed, etc. + pre_job: + name: Skip Duplicate Jobs Pre Job + runs-on: ubuntu-latest + outputs: + should_skip: ${{ steps.skip_check.outputs.should_skip }} + steps: + - id: skip_check + uses: fkirc/skip-duplicate-actions@4c656bbdb6906310fa6213604828008bc28fe55d # v3.3.0 + with: + cancel_others: 'true' + github_token: ${{ github.token }} + + micro-benchmarks: + needs: pre_job + # NOTE: We always want to run job on master since we run some additional checks there (code + # coverage, etc) + if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.ref == 'refs/heads/master' }} + name: '${{ matrix.name }} - Python ${{ matrix.python-version }}' + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - name: 'Microbenchmarks' + task: 'micro-benchmarks' + nosetests_node_total: 1 + nosetests_node_index: 0 + python-version: '3.6' + services: + mongo: + image: mongo:4.4 + ports: + - 27017:27017 + + rabbitmq: + image: rabbitmq:3.8-management + options: >- + --name rabbitmq + ports: + - 5671:5671/tcp # AMQP SSL port + - 5672:5672/tcp # AMQP standard port + - 15672:15672/tcp # Management: HTTP, CLI + + env: + TASK: '${{ matrix.task }}' + + NODE_TOTAL: '${{ matrix.nosetests_node_total }}' + NODE_INDEX: '${{ matrix.nosetests_node_index }}' + + COLUMNS: '120' + ST2_CI: 'true' + + # GitHub is juggling how to set vars for multiple shells. Protect our PATH assumptions. + PATH: /home/runner/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: 'Set up Python (${{ matrix.python-version }})' + uses: actions/setup-python@v2 + with: + python-version: '${{ matrix.python-version }}' + - name: Cache Python Dependencies + uses: actions/cache@v2 + with: + path: | + ~/.cache/pip + virtualenv + ~/virtualenv + key: ${{ runner.os }}-python-${{ matrix.python-version }}-${{ hashFiles('requirements.txt', 'test-requirements.txt') }} + restore-keys: | + ${{ runner.os }}-python-${{ matrix.python }}- + - name: Cache APT Dependencies + id: cache-apt-deps + uses: actions/cache@v2 + with: + path: | + ~/apt_cache + key: ${{ runner.os }}-apt-v5-${{ hashFiles('scripts/github/apt-packages.txt') }} + restore-keys: | + ${{ runner.os }}-apt-v5- + - name: Install APT Dependencies + env: + CACHE_HIT: ${{steps.cache-apt-deps.outputs.cache-hit}} + run: | + ./scripts/github/install-apt-packages-use-cache.sh + - name: Install virtualenv + run: | + ./scripts/github/install-virtualenv.sh + - name: Install requirements + run: | + ./scripts/ci/install-requirements.sh + - name: Print versions + run: | + ./scripts/ci/print-versions.sh + - name: Run Micro Benchmarks + timeout-minutes: 30 + # use: script -e -c to print colors + run: | + script -e -c "make ${TASK}" && exit 0 + - name: Upload Histograms + uses: actions/upload-artifact@v2 + with: + name: benchmark_histograms + path: benchmark_histograms/ + retention-days: 30 + + slack-notification: + name: Slack notification for failed builds + if: always() + needs: + - micro-benchmarks + runs-on: ubuntu-latest + steps: + - name: Workflow conclusion + # this step creates an environment variable WORKFLOW_CONCLUSION and is the most reliable way to check the status of previous jobs + uses: technote-space/workflow-conclusion-action@v2 + - name: CI Run Failure Slack Notification + if: ${{ env.WORKFLOW_CONCLUSION == 'failure' }} + env: + SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} + uses: voxmedia/github-action-slack-notify-build@v1 + with: + channel: development + status: FAILED + color: danger diff --git a/.github/workflows/orquesta-integration-tests.yaml b/.github/workflows/orquesta-integration-tests.yaml index 3f7ea0c80d..36782b2199 100644 --- a/.github/workflows/orquesta-integration-tests.yaml +++ b/.github/workflows/orquesta-integration-tests.yaml @@ -66,36 +66,14 @@ jobs: ports: - 27017:27017 - # In GHA, these services are started first before the code is checked out. - # We use bitnami images to facilitate reconfiguring RabbitMQ during ci-integration tests. - # We rely on custom config and SSL certs that are in the repo. - # Many images require config in env vars (which we can't change during the test job) - # or they require config in entrypoint args (which we can't override for GHA services) - # bitnami builds ways to get config files from mounted volumes. rabbitmq: - image: bitnami/rabbitmq:3.8 - volumes: - - /home/runner/rabbitmq_conf:/bitnami/conf # RABBITMQ_MOUNTED_CONF_DIR - env: - # tell bitnami/rabbitmq to enable this by default - RABBITMQ_PLUGINS: rabbitmq_management - RABBITMQ_USERNAME: guest - RABBITMQ_PASSWORD: guest - - # These are strictly docker options, not entrypoint args (GHA restriction) + image: rabbitmq:3.8-management options: >- --name rabbitmq - ports: - # These 6 ports are exposed by bitnami/rabbitmq (see https://www.rabbitmq.com/networking.html#ports) - # host_port:container_port/protocol - 5671:5671/tcp # AMQP SSL port - 5672:5672/tcp # AMQP standard port - 15672:15672/tcp # Management: HTTP, CLI - #- 15671:15671/tcp # Management: SSL port - #- 25672:25672/tcp # inter-node or CLI - #- 4369:4369/tcp # epmd - # # Used for the coordination backend for integration tests # NOTE: To speed things up, we only start redis for integration tests @@ -199,13 +177,6 @@ jobs: run: | echo "$ST2_CI_REPO_PATH" sudo ST2_CI_REPO_PATH="${ST2_CI_REPO_PATH}" scripts/ci/permissions-workaround.sh - - name: Reconfigure RabbitMQ - # bitnami image allows (see bitnami/rabbitmq readme): - # Here we're copying a rabbitmq.config file which won't do anything. - # We need to switch to custom.conf or advanced.config. - timeout-minutes: 2 # may die if rabbitmq fails to start - run: | - ./scripts/github/configure-rabbitmq.sh - name: Print versions run: | ./scripts/ci/print-versions.sh diff --git a/.gitignore b/.gitignore index 9e174a3e48..5b333de168 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,7 @@ coverage*.xml .tox nosetests.xml htmlcov +benchmark_histograms/ # Mr Developer .idea diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 24d11c77c0..de94c165ec 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -65,6 +65,19 @@ Added Contributed by @Kami. +* Add support for compressing the payloads which are sent over the message bus. Compression is + disabled by default and user can enable it by setting ``messaging.compression`` config option + to one of the following values: ``zstd``, ``lzma``, ``bz2``, ``gzip``. + + In most cases we recommend using ``zstd`` (zstandard) since it offers best trade off between + compression ratio and number of CPU cycles spent for compression and compression. + + How this will affect the deployment and throughput is very much user specific (workflow and + resources available). It may make sense to enable it when generic action trigger is enabled + and when working with executions with large textual results. #5241 + + Contributed by @Kami. + Changed ~~~~~~~ diff --git a/Makefile b/Makefile index 1417024255..c6ce74e937 100644 --- a/Makefile +++ b/Makefile @@ -558,15 +558,19 @@ micro-benchmarks: requirements .micro-benchmarks @echo @echo "==================== micro-benchmarks ====================" @echo - . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_large_execution" - . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_read_large_execution" - . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_multiple_fields" - . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_large_string_value" - . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_read_large_string_value" - . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:dict_keys_count_and_depth -s -v st2common/benchmarks/micro/test_fast_deepcopy.py -k "test_fast_deepcopy_with_dict_values" - . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_fast_deepcopy.py -k "test_fast_deepcopy_with_json_fixture_file" - . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file,param:indent_sort_keys_tuple -s -v st2common/benchmarks/micro/test_json_serialization_and_deserialization.py -k "test_json_dumps" - . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_json_serialization_and_deserialization.py -k "test_json_loads" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_large_execution" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_read_large_execution" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_multiple_fields" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_large_string_value" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_read_large_string_value" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_transport_compression.py -k "test_save_execution" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_transport_compression.py -k "test_read_execution" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:dict_keys_count_and_depth -s -v st2common/benchmarks/micro/test_fast_deepcopy.py -k "test_fast_deepcopy_with_dict_values" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_fast_deepcopy.py -k "test_fast_deepcopy_with_json_fixture_file" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file,param:indent_sort_keys_tuple -s -v st2common/benchmarks/micro/test_json_serialization_and_deserialization.py -k "test_json_dumps" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_json_serialization_and_deserialization.py -k "test_json_loads" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_publisher_compression.py -k "test_pickled_object_compression" + . $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_publisher_compression.py -k "test_pickled_object_compression_publish" .PHONY: .cleanmongodb .cleanmongodb: diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 97885cc5a7..62e1e00f6d 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -189,6 +189,8 @@ redirect_stderr = False [messaging] # URL of all the nodes in a messaging service cluster. cluster_urls = # comma separated list allowed here. +# Compression algorithm to use for compressing the payloads which are sent over the message bus. Valid values include: zstd, lzma, bz2, gzip. Defaults to no compression. +compression = None # How many times should we retry connection before failing. connection_retries = 10 # How long should we wait between connection retries. diff --git a/scripts/github/configure-rabbitmq.sh b/scripts/github/configure-rabbitmq.sh index 6c7b0ba372..336ee86185 100755 --- a/scripts/github/configure-rabbitmq.sh +++ b/scripts/github/configure-rabbitmq.sh @@ -27,4 +27,5 @@ echo sudo wget http://guest:guest@localhost:15672/cli/rabbitmqadmin -O /usr/local/bin/rabbitmqadmin sudo chmod +x /usr/local/bin/rabbitmqadmin # print logs from stdout (RABBITMQ_LOGS=-) -docker logs --tail=20 rabbitmq +docker logs --tail=100 rabbitmq +# TODO: Fail here if service fails to start and exit with -2 diff --git a/st2api/st2api/cmd/api.py b/st2api/st2api/cmd/api.py index 1cf01d0544..e8cc71e875 100644 --- a/st2api/st2api/cmd/api.py +++ b/st2api/st2api/cmd/api.py @@ -33,9 +33,9 @@ from st2common.service_setup import teardown as common_teardown from st2api import config -config.register_opts() -from st2api import app +config.register_opts(ignore_errors=True) +from st2api import app from st2api.validation import validate_rbac_is_correctly_configured __all__ = ["main"] diff --git a/st2auth/st2auth/cmd/api.py b/st2auth/st2auth/cmd/api.py index 4c52f2649c..e10b774872 100644 --- a/st2auth/st2auth/cmd/api.py +++ b/st2auth/st2auth/cmd/api.py @@ -29,7 +29,7 @@ from st2common.service_setup import teardown as common_teardown from st2auth import config -config.register_opts() +config.register_opts(ignore_errors=True) from st2auth import app from st2auth.validation import validate_auth_backend_is_correctly_configured diff --git a/st2common/benchmarks/micro/common.py b/st2common/benchmarks/micro/common.py new file mode 100644 index 0000000000..804e09a2ac --- /dev/null +++ b/st2common/benchmarks/micro/common.py @@ -0,0 +1,66 @@ +# Copyright 2021 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pytest + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +FIXTURES_DIR = os.path.abspath(os.path.join(BASE_DIR, "../fixtures/json")) + + +PYTEST_FIXTURE_FILE_PARAM_NO_8MB_DECORATOR = pytest.mark.parametrize( + "fixture_file", + [ + "tiny_1.json", + "json_61kb.json", + "json_647kb.json", + "json_4mb.json", + "json_4mb_single_large_field.json", + ], + ids=[ + "tiny_1", + "json_61kb", + "json_647kb", + "json_4mb", + "json_4mb_single_large_field", + ], +) + +# NOTE: On CI we skip 8 MB fixture file since it's very slow and substantially slows down that +# workflow. +ST2_CI = os.environ.get("ST2_CI", "false").lower() == "true" + +if ST2_CI: + PYTEST_FIXTURE_FILE_PARAM_DECORATOR = PYTEST_FIXTURE_FILE_PARAM_NO_8MB_DECORATOR +else: + PYTEST_FIXTURE_FILE_PARAM_DECORATOR = pytest.mark.parametrize( + "fixture_file", + [ + "tiny_1.json", + "json_61kb.json", + "json_647kb.json", + "json_4mb.json", + "json_8mb.json", + "json_4mb_single_large_field.json", + ], + ids=[ + "tiny_1", + "json_61kb", + "json_647kb", + "json_4mb", + "json_8mb", + "json_4mb_single_large_field", + ], + ) diff --git a/st2common/benchmarks/micro/test_fast_deepcopy.py b/st2common/benchmarks/micro/test_fast_deepcopy.py index 90b2140d16..d95a7253f1 100644 --- a/st2common/benchmarks/micro/test_fast_deepcopy.py +++ b/st2common/benchmarks/micro/test_fast_deepcopy.py @@ -1,5 +1,4 @@ -# Copyright 2020 The StackStorm Authors. -# Copyright 2019 Extreme Networks, Inc. +# Copyright 2021 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,6 +20,10 @@ # TODO: Also use actual orquesta context and execution fixture files which contain real life data # with large text strings, different value types, etc. +from st2common.util.monkey_patch import monkey_patch + +monkey_patch() + import os import copy import random @@ -31,8 +34,7 @@ import ujson import orjson -BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -FIXTURES_DIR = os.path.abspath(os.path.join(BASE_DIR, "../fixtures/json")) +from common import FIXTURES_DIR def generate_random_dict(keys_count=10, depth=1): @@ -103,7 +105,7 @@ def run_benchmark(): else: raise ValueError("Invalid implementation: %s" % (implementation)) - result = benchmark.pedantic(run_benchmark, iterations=50, rounds=50) + result = benchmark(run_benchmark) assert result == data, "Output is not the same as the input" @@ -148,5 +150,5 @@ def run_benchmark(): else: raise ValueError("Invalid implementation: %s" % (implementation)) - result = benchmark.pedantic(run_benchmark, iterations=5, rounds=5) + result = benchmark(run_benchmark) assert result == data, "Output is not the same as the input" diff --git a/st2common/benchmarks/micro/test_json_serialization_and_deserialization.py b/st2common/benchmarks/micro/test_json_serialization_and_deserialization.py index 38a30e27b1..e90626ae21 100644 --- a/st2common/benchmarks/micro/test_json_serialization_and_deserialization.py +++ b/st2common/benchmarks/micro/test_json_serialization_and_deserialization.py @@ -1,5 +1,4 @@ -# Copyright 2020 The StackStorm Authors. -# Copyright 2019 Extreme Networks, Inc. +# Copyright 2021 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,6 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from st2common.util.monkey_patch import monkey_patch + +monkey_patch() + import os import json import simplejson @@ -23,8 +26,7 @@ from st2common.util.jsonify import json_encode_orjson -BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -FIXTURES_DIR = os.path.abspath(os.path.join(BASE_DIR, "../fixtures/json")) +from common import FIXTURES_DIR @pytest.mark.parametrize( @@ -86,7 +88,7 @@ def run_benchmark(): else: raise ValueError("Invalid implementation: %s" % (implementation)) - result = benchmark.pedantic(run_benchmark, iterations=5, rounds=5) + result = benchmark(run_benchmark) assert len(result) >= 40000 @@ -128,5 +130,5 @@ def run_benchmark(): else: raise ValueError("Invalid implementation: %s" % (implementation)) - result = benchmark.pedantic(run_benchmark, iterations=10, rounds=10) + result = benchmark(run_benchmark) assert result == content_loaded diff --git a/st2common/benchmarks/micro/test_mongo_escape_and_unescape.py b/st2common/benchmarks/micro/test_mongo_escape_and_unescape.py index d3e3eb8ec5..0e70e9320a 100644 --- a/st2common/benchmarks/micro/test_mongo_escape_and_unescape.py +++ b/st2common/benchmarks/micro/test_mongo_escape_and_unescape.py @@ -1,5 +1,4 @@ -# Copyright 2020 The StackStorm Authors. -# Copyright 2019 Extreme Networks, Inc. +# Copyright 2021 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,10 +15,14 @@ """ Micro benchmarks which benchmark our mongo escape and unescape function. -NOTE: We utiliz JSON fixture files which also contain values even though escaping only operates +NOTE: We utilize JSON fixture files which also contain values even though escaping only operates on the item keys. """ +from st2common.util.monkey_patch import monkey_patch + +monkey_patch() + import os import json @@ -27,8 +30,7 @@ from st2common.util import mongoescape -BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -FIXTURES_DIR = os.path.abspath(os.path.join(BASE_DIR, "../fixtures/json")) +from common import FIXTURES_DIR @pytest.mark.parametrize( @@ -61,7 +63,7 @@ def run_benchmark(): result = mongoescape.escape_chars(data) return result - escaped_data = benchmark.pedantic(run_benchmark, iterations=10, rounds=10) + escaped_data = benchmark(run_benchmark) unescaped_data = mongoescape.unescape_chars(escaped_data) assert escaped_data != data assert unescaped_data == data @@ -98,6 +100,6 @@ def run_benchmark(): result = mongoescape.unescape_chars(escaped_data) return result - unescaped_data = benchmark.pedantic(run_benchmark, iterations=10, rounds=10) + unescaped_data = benchmark(run_benchmark) escaped_data = mongoescape.escape_chars(escaped_data) assert unescaped_data != escaped_data diff --git a/st2common/benchmarks/micro/test_mongo_field_types.py b/st2common/benchmarks/micro/test_mongo_field_types.py index 9544e623f3..65cffaff22 100644 --- a/st2common/benchmarks/micro/test_mongo_field_types.py +++ b/st2common/benchmarks/micro/test_mongo_field_types.py @@ -1,5 +1,4 @@ -# Copyright 2020 The StackStorm Authors. -# Copyright 2019 Extreme Networks, Inc. +# Copyright 2021 The StackStorm Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,6 +35,10 @@ implement in a backward compatible manner. """ +from st2common.util.monkey_patch import monkey_patch + +monkey_patch() + from typing import Type import os @@ -50,11 +53,12 @@ from st2common.persistence.liveaction import LiveAction from st2common.fields import JSONDictField -BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -FIXTURES_DIR = os.path.abspath(os.path.join(BASE_DIR, "../fixtures/json")) +from common import FIXTURES_DIR +from common import PYTEST_FIXTURE_FILE_PARAM_DECORATOR +from common import PYTEST_FIXTURE_FILE_PARAM_NO_8MB_DECORATOR -# Needeed so we can subclass it +# Needed so we can subclass it LiveActionDB._meta["allow_inheritance"] = True @@ -134,25 +138,7 @@ def get_model_class_for_approach(approach: str) -> Type[LiveActionDB]: return model_cls -@pytest.mark.parametrize( - "fixture_file", - [ - "tiny_1.json", - "json_61kb.json", - "json_647kb.json", - "json_4mb.json", - "json_8mb.json", - "json_4mb_single_large_field.json", - ], - ids=[ - "tiny_1", - "json_61kb", - "json_647kb", - "json_4mb", - "json_8mb", - "json_4mb_single_large_field", - ], -) +@PYTEST_FIXTURE_FILE_PARAM_DECORATOR @pytest.mark.parametrize( "approach", [ @@ -190,30 +176,14 @@ def run_benchmark(): inserted_live_action_db = LiveAction.add_or_update(live_action_db) return inserted_live_action_db - inserted_live_action_db = benchmark.pedantic(run_benchmark, iterations=3, rounds=3) + inserted_live_action_db = benchmark(run_benchmark) retrieved_live_action_db = LiveAction.get_by_id(inserted_live_action_db.id) # Assert that result is correctly converted back to dict on retrieval assert inserted_live_action_db.result == data assert inserted_live_action_db == retrieved_live_action_db -@pytest.mark.parametrize( - "fixture_file", - [ - "tiny_1.json", - "json_61kb.json", - "json_647kb.json", - "json_4mb.json", - "json_4mb_single_large_field.json", - ], - ids=[ - "tiny_1", - "json_61kb", - "json_647kb", - "json_4mb", - "json_4mb_single_large_field", - ], -) +@PYTEST_FIXTURE_FILE_PARAM_NO_8MB_DECORATOR @pytest.mark.parametrize( "approach", [ @@ -257,7 +227,7 @@ def run_benchmark(): inserted_live_action_db = LiveAction.add_or_update(live_action_db) return inserted_live_action_db - inserted_live_action_db = benchmark.pedantic(run_benchmark, iterations=3, rounds=3) + inserted_live_action_db = benchmark(run_benchmark) retrieved_live_action_db = LiveAction.get_by_id(inserted_live_action_db.id) # Assert that result is correctly converted back to dict on retrieval assert inserted_live_action_db.field1 == data @@ -266,25 +236,7 @@ def run_benchmark(): assert inserted_live_action_db == retrieved_live_action_db -@pytest.mark.parametrize( - "fixture_file", - [ - "tiny_1.json", - "json_61kb.json", - "json_647kb.json", - "json_4mb.json", - "json_8mb.json", - "json_4mb_single_large_field.json", - ], - ids=[ - "tiny_1", - "json_61kb", - "json_647kb", - "json_4mb", - "json_8mb", - "json_4mb_single_large_field", - ], -) +@PYTEST_FIXTURE_FILE_PARAM_DECORATOR @pytest.mark.parametrize( "approach", [ @@ -325,29 +277,13 @@ def run_benchmark(): retrieved_live_action_db = LiveAction.get_by_id(inserted_live_action_db.id) return retrieved_live_action_db - retrieved_live_action_db = benchmark.pedantic(run_benchmark, iterations=3, rounds=3) + retrieved_live_action_db = benchmark(run_benchmark) # Assert that result is correctly converted back to dict on retrieval assert retrieved_live_action_db == inserted_live_action_db assert retrieved_live_action_db.result == data -@pytest.mark.parametrize( - "fixture_file", - [ - "tiny_1.json", - "json_61kb.json", - "json_647kb.json", - "json_4mb.json", - "json_4mb_single_large_field.json", - ], - ids=[ - "tiny_1", - "json_61kb", - "json_647kb", - "json_4mb", - "json_4mb_single_large_field", - ], -) +@PYTEST_FIXTURE_FILE_PARAM_DECORATOR @pytest.mark.parametrize( "approach", [ @@ -384,29 +320,11 @@ def run_benchmark(): inserted_live_action_db = LiveAction.add_or_update(live_action_db) return inserted_live_action_db - inserted_live_action_db = benchmark.pedantic( - run_benchmark, iterations=10, rounds=10 - ) + inserted_live_action_db = benchmark(run_benchmark) assert bool(inserted_live_action_db.value) -@pytest.mark.parametrize( - "fixture_file", - [ - "tiny_1.json", - "json_61kb.json", - "json_647kb.json", - "json_4mb.json", - "json_4mb_single_large_field.json", - ], - ids=[ - "tiny_1", - "json_61kb", - "json_647kb", - "json_4mb", - "json_4mb_single_large_field", - ], -) +@PYTEST_FIXTURE_FILE_PARAM_DECORATOR @pytest.mark.parametrize( "approach", [ @@ -418,7 +336,7 @@ def run_benchmark(): "binary_field", ], ) -@pytest.mark.benchmark(group="test_model_save") +@pytest.mark.benchmark(group="test_model_read") def test_read_large_string_value(benchmark, fixture_file: str, approach: str) -> None: with open(os.path.join(FIXTURES_DIR, fixture_file), "rb") as fp: content = fp.read() @@ -445,8 +363,6 @@ def run_benchmark(): retrieved_live_action_db = LiveAction.get_by_id(inserted_live_action_db.id) return retrieved_live_action_db - retrieved_live_action_db = benchmark.pedantic( - run_benchmark, iterations=10, rounds=10 - ) + retrieved_live_action_db = benchmark(run_benchmark) assert retrieved_live_action_db == inserted_live_action_db assert retrieved_live_action_db.value == content diff --git a/st2common/benchmarks/micro/test_mongo_transport_compression.py b/st2common/benchmarks/micro/test_mongo_transport_compression.py new file mode 100644 index 0000000000..27368fbb76 --- /dev/null +++ b/st2common/benchmarks/micro/test_mongo_transport_compression.py @@ -0,0 +1,121 @@ +# Copyright 2021 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Benchmarks which measure how much overhead enabling transport / network level MongoDB compression +adds. +""" + +from st2common.util.monkey_patch import monkey_patch + +monkey_patch() + +import os + +import pytest + +from oslo_config import cfg +from mongoengine.connection import disconnect + +from st2common.service_setup import db_setup +from st2common.models.db.liveaction import LiveActionDB +from st2common.persistence.liveaction import LiveAction + +from common import FIXTURES_DIR +from common import PYTEST_FIXTURE_FILE_PARAM_DECORATOR + + +@PYTEST_FIXTURE_FILE_PARAM_DECORATOR +@pytest.mark.parametrize( + "compression", + [ + None, + "zstd", + ], + ids=[ + "none", + "zstd", + ], +) +@pytest.mark.benchmark(group="test_model_save") +def test_save_execution(benchmark, fixture_file: str, compression): + with open(os.path.join(FIXTURES_DIR, fixture_file), "rb") as fp: + content = fp.read() + + cfg.CONF.set_override(name="compressors", group="database", override=compression) + + # NOTE: It's important we correctly reestablish connection before each setting change + disconnect() + connection = db_setup() + + if compression is None: + assert "compressors" not in str(connection) + elif compression == "zstd": + assert "compressors=['zstd']" in str(connection) + + def run_benchmark(): + live_action_db = LiveActionDB() + live_action_db.status = "succeeded" + live_action_db.action = "core.local" + live_action_db.result = content + + inserted_live_action_db = LiveAction.add_or_update(live_action_db) + return inserted_live_action_db + + inserted_live_action_db = benchmark(run_benchmark) + assert inserted_live_action_db.result == content + + +@PYTEST_FIXTURE_FILE_PARAM_DECORATOR +@pytest.mark.parametrize( + "compression", + [ + None, + "zstd", + ], + ids=[ + "none", + "zstd", + ], +) +@pytest.mark.benchmark(group="test_model_read") +def test_read_execution(benchmark, fixture_file: str, compression): + with open(os.path.join(FIXTURES_DIR, fixture_file), "rb") as fp: + content = fp.read() + + cfg.CONF.set_override(name="compressors", group="database", override=compression) + + # NOTE: It's important we correctly reestablish connection before each setting change + disconnect() + connection = db_setup() + + if compression is None: + assert "compressors" not in str(connection) + elif compression == "zstd": + assert "compressors=['zstd']" in str(connection) + + live_action_db = LiveActionDB() + live_action_db.status = "succeeded" + live_action_db.action = "core.local" + live_action_db.result = content + + inserted_live_action_db = LiveAction.add_or_update(live_action_db) + + def run_benchmark(): + retrieved_live_action_db = LiveAction.get_by_id(inserted_live_action_db.id) + return retrieved_live_action_db + + retrieved_live_action_db = benchmark(run_benchmark) + # Assert that result is correctly converted back to dict on retrieval + assert retrieved_live_action_db == inserted_live_action_db diff --git a/st2common/benchmarks/micro/test_publisher_compression.py b/st2common/benchmarks/micro/test_publisher_compression.py new file mode 100644 index 0000000000..51d13fd8a4 --- /dev/null +++ b/st2common/benchmarks/micro/test_publisher_compression.py @@ -0,0 +1,114 @@ +# Copyright 2021 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from st2common.util.monkey_patch import monkey_patch + +monkey_patch() + +from kombu import Exchange +from kombu.serialization import pickle + +import os +import json + +import pytest +import zstandard as zstd + +from st2common.models.db.liveaction import LiveActionDB +from st2common.transport import publishers + +from common import FIXTURES_DIR +from common import PYTEST_FIXTURE_FILE_PARAM_DECORATOR + + +@PYTEST_FIXTURE_FILE_PARAM_DECORATOR +@pytest.mark.parametrize( + "algorithm", + [ + "none", + "zstandard", + ], + ids=[ + "none", + "zstandard", + ], +) +@pytest.mark.benchmark(group="no_publish") +def test_pickled_object_compression( + benchmark, fixture_file: str, algorithm: str +) -> None: + with open(os.path.join(FIXTURES_DIR, fixture_file), "r") as fp: + content = fp.read() + + data = json.loads(content) + + def run_benchmark(): + live_action_db = LiveActionDB() + live_action_db.status = "succeeded" + live_action_db.action = "core.local" + live_action_db.result = data + + serialized = pickle.dumps(live_action_db) + + if algorithm == "zstandard": + c = zstd.ZstdCompressor() + serialized = c.compress(serialized) + + return serialized + + result = benchmark.pedantic(run_benchmark, iterations=5, rounds=5) + assert isinstance(result, bytes) + + +@PYTEST_FIXTURE_FILE_PARAM_DECORATOR +@pytest.mark.parametrize( + "algorithm", + [ + "none", + "zstandard", + ], + ids=[ + "none", + "zstandard", + ], +) +@pytest.mark.benchmark(group="publish") +def test_pickled_object_compression_publish( + benchmark, fixture_file: str, algorithm: str +) -> None: + with open(os.path.join(FIXTURES_DIR, fixture_file), "r") as fp: + content = fp.read() + + data = json.loads(content) + + publisher = publishers.PoolPublisher() + + exchange = Exchange("st2.execution.test", type="topic") + + if algorithm == "zstandard": + compression = "zstd" + else: + compression = None + + def run_benchmark(): + live_action_db = LiveActionDB() + live_action_db.status = "succeeded" + live_action_db.action = "core.local" + live_action_db.result = data + + publisher.publish( + payload=live_action_db, exchange=exchange, compression=compression + ) + + benchmark.pedantic(run_benchmark, iterations=5, rounds=5) diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index 9d39f86d64..cf2ada4ee3 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -318,6 +318,13 @@ def register_opts(ignore_errors=False): default=None, help="Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).", ), + cfg.StrOpt( + "compression", + default=None, + help="Compression algorithm to use for compressing the payloads which are sent over " + "the message bus. Valid values include: zstd, lzma, bz2, gzip. Defaults to no " + "compression.", + ), ] do_register_opts(messaging_opts, "messaging", ignore_errors) diff --git a/st2common/st2common/transport/publishers.py b/st2common/st2common/transport/publishers.py index 202220acb1..ddf5d2f8c9 100644 --- a/st2common/st2common/transport/publishers.py +++ b/st2common/st2common/transport/publishers.py @@ -18,6 +18,7 @@ import copy from kombu.messaging import Producer +from oslo_config import cfg from st2common import log as logging from st2common.metrics.base import Timer @@ -56,7 +57,9 @@ def __init__(self, urls=None): def errback(self, exc, interval): LOG.error("Rabbitmq connection error: %s", exc.message, exc_info=False) - def publish(self, payload, exchange, routing_key=""): + def publish(self, payload, exchange, routing_key="", compression=None): + compression = compression or cfg.CONF.messaging.compression + with Timer(key="amqp.pool_publisher.publish_with_retries." + exchange.name): with self.pool.acquire(block=True) as connection: retry_wrapper = ConnectionRetryWrapper( @@ -74,6 +77,7 @@ def do_publish(connection, channel): "exchange": exchange, "routing_key": routing_key, "serializer": "pickle", + "compression": compression, "content_encoding": "utf-8", } diff --git a/st2common/tests/unit/test_transport.py b/st2common/tests/unit/test_transport.py index 75e35ae2c9..c8148f686e 100644 --- a/st2common/tests/unit/test_transport.py +++ b/st2common/tests/unit/test_transport.py @@ -13,16 +13,137 @@ # See the License for the specific language governing permissions and # limitations under the License. +from st2common.util.monkey_patch import monkey_patch + +monkey_patch() + import ssl +import random import unittest2 +import eventlet + +from bson.objectid import ObjectId +from kombu.mixins import ConsumerMixin +from kombu import Exchange +from kombu import Queue +from oslo_config import cfg +from st2common.transport.publishers import PoolPublisher from st2common.transport.utils import _get_ssl_kwargs +from st2common.transport import utils as transport_utils +from st2common.models.db.liveaction import LiveActionDB __all__ = ["TransportUtilsTestCase"] +class QueueConsumer(ConsumerMixin): + def __init__(self, connection, queue): + self.connection = connection + self.queue = queue + + self.received_messages = [] + + def get_consumers(self, Consumer, channel): + return [ + Consumer( + queues=[self.queue], accept=["pickle"], callbacks=[self.process_task] + ) + ] + + def process_task(self, body, message): + self.received_messages.append((body, message)) + message.ack() + + class TransportUtilsTestCase(unittest2.TestCase): + def tearDown(self): + super(TransportUtilsTestCase, self).tearDown() + cfg.CONF.set_override(name="compression", group="messaging", override=None) + + def test_publish_compression(self): + live_action_db = LiveActionDB() + live_action_db.id = ObjectId() + live_action_db.status = "succeeded" + live_action_db.action = "core.local" + live_action_db.result = {"foo": "bar"} + + exchange = Exchange("st2.execution.test", type="topic") + queue_name = "test-" + str(random.randint(1, 10000)) + queue = Queue( + name=queue_name, exchange=exchange, routing_key="#", auto_delete=True + ) + publisher = PoolPublisher() + + with transport_utils.get_connection() as connection: + connection.connect() + watcher = QueueConsumer(connection=connection, queue=queue) + watcher_thread = eventlet.greenthread.spawn(watcher.run) + + # Give it some time to start up since we are publishing on a new queue + eventlet.sleep(0.5) + + self.assertEqual(len(watcher.received_messages), 0) + + # 1. Verify compression is off as a default + publisher.publish(payload=live_action_db, exchange=exchange) + eventlet.sleep(0.2) + + self.assertEqual(len(watcher.received_messages), 1) + self.assertEqual( + watcher.received_messages[0][1].properties["content_type"], + "application/x-python-serialize", + ) + self.assertEqual( + watcher.received_messages[0][1].properties["content_encoding"], "binary" + ) + self.assertEqual( + watcher.received_messages[0][1].properties["application_headers"], {} + ) + self.assertEqual(watcher.received_messages[0][0].id, live_action_db.id) + + # 2. Verify config level option is used + cfg.CONF.set_override(name="compression", group="messaging", override="zstd") + publisher.publish(payload=live_action_db, exchange=exchange) + + eventlet.sleep(0.2) + + self.assertEqual(len(watcher.received_messages), 2) + self.assertEqual( + watcher.received_messages[1][1].properties["content_type"], + "application/x-python-serialize", + ) + self.assertEqual( + watcher.received_messages[1][1].properties["content_encoding"], "binary" + ) + self.assertEqual( + watcher.received_messages[1][1].properties["application_headers"], + {"compression": "application/zstd"}, + ) + self.assertEqual(watcher.received_messages[1][0].id, live_action_db.id) + + # 2. Verify argument level option is used and has precedence over config one + cfg.CONF.set_override(name="compression", group="messaging", override="zstd") + publisher.publish(payload=live_action_db, exchange=exchange, compression="gzip") + + eventlet.sleep(0.2) + + self.assertEqual(len(watcher.received_messages), 3) + self.assertEqual( + watcher.received_messages[2][1].properties["content_type"], + "application/x-python-serialize", + ) + self.assertEqual( + watcher.received_messages[2][1].properties["content_encoding"], "binary" + ) + self.assertEqual( + watcher.received_messages[2][1].properties["application_headers"], + {"compression": "application/x-gzip"}, + ) + self.assertEqual(watcher.received_messages[2][0].id, live_action_db.id) + + watcher_thread.kill() + def test_get_ssl_kwargs(self): # 1. No SSL kwargs provided ssl_kwargs = _get_ssl_kwargs() diff --git a/st2stream/st2stream/cmd/api.py b/st2stream/st2stream/cmd/api.py index b4ce963ea5..35f0c5b3bd 100644 --- a/st2stream/st2stream/cmd/api.py +++ b/st2stream/st2stream/cmd/api.py @@ -32,7 +32,8 @@ from st2stream.signal_handlers import register_stream_signal_handlers from st2stream import config -config.register_opts() +config.register_opts(ignore_errors=True) + from st2stream import app __all__ = ["main"] diff --git a/test-requirements.txt b/test-requirements.txt index e7eb4efb91..54295c2f3f 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -38,6 +38,7 @@ prance==0.15.0 pip-tools>=5.4,<6.1 pytest==5.4.3 pytest-benchmark==3.2.3 +pytest-benchmark[histogram]==3.2.3 # zstandard is used for micro benchmarks zstandard==0.15.2 # ujson is used for micro benchmarks diff --git a/tools/config_gen.py b/tools/config_gen.py index 48ce5d205a..fc92195e9c 100755 --- a/tools/config_gen.py +++ b/tools/config_gen.py @@ -182,7 +182,7 @@ def main(args): opt_groups = {} for config in CONFIGS: mod = _import_config(config) - mod.register_opts() + mod.register_opts(ignore_errors=True) _read_current_config(opt_groups) _clear_config() _read_groups(opt_groups)