Skip to content

Commit

Permalink
Merge pull request #5241 from StackStorm/message_bus_compression_support
Browse files Browse the repository at this point in the history
Add support for compressing payloads sent over the message bus
  • Loading branch information
Kami authored Apr 22, 2021
2 parents 3734d75 + d22c4ae commit 78ae28e
Show file tree
Hide file tree
Showing 22 changed files with 660 additions and 169 deletions.
142 changes: 142 additions & 0 deletions .github/workflows/microbenchmarks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Workflow which runs micro benchmarks on nightly basis
# We run it nightly instead on every PR / merge to master since some of those benchmarks take quite a while.
name: Micro Benchmarks

on:
pull_request:
type: [opened, reopened, edited]
branches:
# Only for PRs targeting those branches
- master
- v[0-9]+.[0-9]+
schedule:
- cron: '30 3 * * *'

jobs:
# Special job which automatically cancels old runs for the same branch, prevents runs for the
# same file set which has already passed, etc.
pre_job:
name: Skip Duplicate Jobs Pre Job
runs-on: ubuntu-latest
outputs:
should_skip: ${{ steps.skip_check.outputs.should_skip }}
steps:
- id: skip_check
uses: fkirc/skip-duplicate-actions@4c656bbdb6906310fa6213604828008bc28fe55d # v3.3.0
with:
cancel_others: 'true'
github_token: ${{ github.token }}

micro-benchmarks:
needs: pre_job
# NOTE: We always want to run job on master since we run some additional checks there (code
# coverage, etc)
if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.ref == 'refs/heads/master' }}
name: '${{ matrix.name }} - Python ${{ matrix.python-version }}'
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- name: 'Microbenchmarks'
task: 'micro-benchmarks'
nosetests_node_total: 1
nosetests_node_index: 0
python-version: '3.6'
services:
mongo:
image: mongo:4.4
ports:
- 27017:27017

rabbitmq:
image: rabbitmq:3.8-management
options: >-
--name rabbitmq
ports:
- 5671:5671/tcp # AMQP SSL port
- 5672:5672/tcp # AMQP standard port
- 15672:15672/tcp # Management: HTTP, CLI

env:
TASK: '${{ matrix.task }}'

NODE_TOTAL: '${{ matrix.nosetests_node_total }}'
NODE_INDEX: '${{ matrix.nosetests_node_index }}'

COLUMNS: '120'
ST2_CI: 'true'

# GitHub is juggling how to set vars for multiple shells. Protect our PATH assumptions.
PATH: /home/runner/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: 'Set up Python (${{ matrix.python-version }})'
uses: actions/setup-python@v2
with:
python-version: '${{ matrix.python-version }}'
- name: Cache Python Dependencies
uses: actions/cache@v2
with:
path: |
~/.cache/pip
virtualenv
~/virtualenv
key: ${{ runner.os }}-python-${{ matrix.python-version }}-${{ hashFiles('requirements.txt', 'test-requirements.txt') }}
restore-keys: |
${{ runner.os }}-python-${{ matrix.python }}-
- name: Cache APT Dependencies
id: cache-apt-deps
uses: actions/cache@v2
with:
path: |
~/apt_cache
key: ${{ runner.os }}-apt-v5-${{ hashFiles('scripts/github/apt-packages.txt') }}
restore-keys: |
${{ runner.os }}-apt-v5-
- name: Install APT Dependencies
env:
CACHE_HIT: ${{steps.cache-apt-deps.outputs.cache-hit}}
run: |
./scripts/github/install-apt-packages-use-cache.sh
- name: Install virtualenv
run: |
./scripts/github/install-virtualenv.sh
- name: Install requirements
run: |
./scripts/ci/install-requirements.sh
- name: Print versions
run: |
./scripts/ci/print-versions.sh
- name: Run Micro Benchmarks
timeout-minutes: 30
# use: script -e -c to print colors
run: |
script -e -c "make ${TASK}" && exit 0
- name: Upload Histograms
uses: actions/upload-artifact@v2
with:
name: benchmark_histograms
path: benchmark_histograms/
retention-days: 30

slack-notification:
name: Slack notification for failed builds
if: always()
needs:
- micro-benchmarks
runs-on: ubuntu-latest
steps:
- name: Workflow conclusion
# this step creates an environment variable WORKFLOW_CONCLUSION and is the most reliable way to check the status of previous jobs
uses: technote-space/workflow-conclusion-action@v2
- name: CI Run Failure Slack Notification
if: ${{ env.WORKFLOW_CONCLUSION == 'failure' }}
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
uses: voxmedia/github-action-slack-notify-build@v1
with:
channel: development
status: FAILED
color: danger
31 changes: 1 addition & 30 deletions .github/workflows/orquesta-integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,36 +66,14 @@ jobs:
ports:
- 27017:27017

# In GHA, these services are started first before the code is checked out.
# We use bitnami images to facilitate reconfiguring RabbitMQ during ci-integration tests.
# We rely on custom config and SSL certs that are in the repo.
# Many images require config in env vars (which we can't change during the test job)
# or they require config in entrypoint args (which we can't override for GHA services)
# bitnami builds ways to get config files from mounted volumes.
rabbitmq:
image: bitnami/rabbitmq:3.8
volumes:
- /home/runner/rabbitmq_conf:/bitnami/conf # RABBITMQ_MOUNTED_CONF_DIR
env:
# tell bitnami/rabbitmq to enable this by default
RABBITMQ_PLUGINS: rabbitmq_management
RABBITMQ_USERNAME: guest
RABBITMQ_PASSWORD: guest

# These are strictly docker options, not entrypoint args (GHA restriction)
image: rabbitmq:3.8-management
options: >-
--name rabbitmq
ports:
# These 6 ports are exposed by bitnami/rabbitmq (see https://www.rabbitmq.com/networking.html#ports)
# host_port:container_port/protocol
- 5671:5671/tcp # AMQP SSL port
- 5672:5672/tcp # AMQP standard port
- 15672:15672/tcp # Management: HTTP, CLI
#- 15671:15671/tcp # Management: SSL port
#- 25672:25672/tcp # inter-node or CLI
#- 4369:4369/tcp # epmd
#

# Used for the coordination backend for integration tests
# NOTE: To speed things up, we only start redis for integration tests
Expand Down Expand Up @@ -199,13 +177,6 @@ jobs:
run: |
echo "$ST2_CI_REPO_PATH"
sudo ST2_CI_REPO_PATH="${ST2_CI_REPO_PATH}" scripts/ci/permissions-workaround.sh
- name: Reconfigure RabbitMQ
# bitnami image allows (see bitnami/rabbitmq readme):
# Here we're copying a rabbitmq.config file which won't do anything.
# We need to switch to custom.conf or advanced.config.
timeout-minutes: 2 # may die if rabbitmq fails to start
run: |
./scripts/github/configure-rabbitmq.sh
- name: Print versions
run: |
./scripts/ci/print-versions.sh
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ coverage*.xml
.tox
nosetests.xml
htmlcov
benchmark_histograms/

# Mr Developer
.idea
Expand Down
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ Added

Contributed by @Kami.

* Add support for compressing the payloads which are sent over the message bus. Compression is
disabled by default and user can enable it by setting ``messaging.compression`` config option
to one of the following values: ``zstd``, ``lzma``, ``bz2``, ``gzip``.

In most cases we recommend using ``zstd`` (zstandard) since it offers best trade off between
compression ratio and number of CPU cycles spent for compression and compression.

How this will affect the deployment and throughput is very much user specific (workflow and
resources available). It may make sense to enable it when generic action trigger is enabled
and when working with executions with large textual results. #5241

Contributed by @Kami.

Changed
~~~~~~~

Expand Down
22 changes: 13 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -558,15 +558,19 @@ micro-benchmarks: requirements .micro-benchmarks
@echo
@echo "==================== micro-benchmarks ===================="
@echo
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_large_execution"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_read_large_execution"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_multiple_fields"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_large_string_value"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_read_large_string_value"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:dict_keys_count_and_depth -s -v st2common/benchmarks/micro/test_fast_deepcopy.py -k "test_fast_deepcopy_with_dict_values"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_fast_deepcopy.py -k "test_fast_deepcopy_with_json_fixture_file"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file,param:indent_sort_keys_tuple -s -v st2common/benchmarks/micro/test_json_serialization_and_deserialization.py -k "test_json_dumps"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_json_serialization_and_deserialization.py -k "test_json_loads"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_large_execution"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_read_large_execution"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_multiple_fields"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_save_large_string_value"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_field_types.py -k "test_read_large_string_value"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_transport_compression.py -k "test_save_execution"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_mongo_transport_compression.py -k "test_read_execution"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:dict_keys_count_and_depth -s -v st2common/benchmarks/micro/test_fast_deepcopy.py -k "test_fast_deepcopy_with_dict_values"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_fast_deepcopy.py -k "test_fast_deepcopy_with_json_fixture_file"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file,param:indent_sort_keys_tuple -s -v st2common/benchmarks/micro/test_json_serialization_and_deserialization.py -k "test_json_dumps"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_json_serialization_and_deserialization.py -k "test_json_loads"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_publisher_compression.py -k "test_pickled_object_compression"
. $(VIRTUALENV_DIR)/bin/activate; pytest --benchmark-histogram=benchmark_histograms/benchmark --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-group-by=group,param:fixture_file -s -v st2common/benchmarks/micro/test_publisher_compression.py -k "test_pickled_object_compression_publish"

.PHONY: .cleanmongodb
.cleanmongodb:
Expand Down
2 changes: 2 additions & 0 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ redirect_stderr = False
[messaging]
# URL of all the nodes in a messaging service cluster.
cluster_urls = # comma separated list allowed here.
# Compression algorithm to use for compressing the payloads which are sent over the message bus. Valid values include: zstd, lzma, bz2, gzip. Defaults to no compression.
compression = None
# How many times should we retry connection before failing.
connection_retries = 10
# How long should we wait between connection retries.
Expand Down
3 changes: 2 additions & 1 deletion scripts/github/configure-rabbitmq.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ echo
sudo wget http://guest:guest@localhost:15672/cli/rabbitmqadmin -O /usr/local/bin/rabbitmqadmin
sudo chmod +x /usr/local/bin/rabbitmqadmin
# print logs from stdout (RABBITMQ_LOGS=-)
docker logs --tail=20 rabbitmq
docker logs --tail=100 rabbitmq
# TODO: Fail here if service fails to start and exit with -2
4 changes: 2 additions & 2 deletions st2api/st2api/cmd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
from st2common.service_setup import teardown as common_teardown
from st2api import config

config.register_opts()
from st2api import app
config.register_opts(ignore_errors=True)

from st2api import app
from st2api.validation import validate_rbac_is_correctly_configured

__all__ = ["main"]
Expand Down
2 changes: 1 addition & 1 deletion st2auth/st2auth/cmd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from st2common.service_setup import teardown as common_teardown
from st2auth import config

config.register_opts()
config.register_opts(ignore_errors=True)

from st2auth import app
from st2auth.validation import validate_auth_backend_is_correctly_configured
Expand Down
66 changes: 66 additions & 0 deletions st2common/benchmarks/micro/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2021 The StackStorm Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import pytest

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FIXTURES_DIR = os.path.abspath(os.path.join(BASE_DIR, "../fixtures/json"))


PYTEST_FIXTURE_FILE_PARAM_NO_8MB_DECORATOR = pytest.mark.parametrize(
"fixture_file",
[
"tiny_1.json",
"json_61kb.json",
"json_647kb.json",
"json_4mb.json",
"json_4mb_single_large_field.json",
],
ids=[
"tiny_1",
"json_61kb",
"json_647kb",
"json_4mb",
"json_4mb_single_large_field",
],
)

# NOTE: On CI we skip 8 MB fixture file since it's very slow and substantially slows down that
# workflow.
ST2_CI = os.environ.get("ST2_CI", "false").lower() == "true"

if ST2_CI:
PYTEST_FIXTURE_FILE_PARAM_DECORATOR = PYTEST_FIXTURE_FILE_PARAM_NO_8MB_DECORATOR
else:
PYTEST_FIXTURE_FILE_PARAM_DECORATOR = pytest.mark.parametrize(
"fixture_file",
[
"tiny_1.json",
"json_61kb.json",
"json_647kb.json",
"json_4mb.json",
"json_8mb.json",
"json_4mb_single_large_field.json",
],
ids=[
"tiny_1",
"json_61kb",
"json_647kb",
"json_4mb",
"json_8mb",
"json_4mb_single_large_field",
],
)
14 changes: 8 additions & 6 deletions st2common/benchmarks/micro/test_fast_deepcopy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Copyright 2020 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
# Copyright 2021 The StackStorm Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,6 +20,10 @@
# TODO: Also use actual orquesta context and execution fixture files which contain real life data
# with large text strings, different value types, etc.

from st2common.util.monkey_patch import monkey_patch

monkey_patch()

import os
import copy
import random
Expand All @@ -31,8 +34,7 @@
import ujson
import orjson

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FIXTURES_DIR = os.path.abspath(os.path.join(BASE_DIR, "../fixtures/json"))
from common import FIXTURES_DIR


def generate_random_dict(keys_count=10, depth=1):
Expand Down Expand Up @@ -103,7 +105,7 @@ def run_benchmark():
else:
raise ValueError("Invalid implementation: %s" % (implementation))

result = benchmark.pedantic(run_benchmark, iterations=50, rounds=50)
result = benchmark(run_benchmark)
assert result == data, "Output is not the same as the input"


Expand Down Expand Up @@ -148,5 +150,5 @@ def run_benchmark():
else:
raise ValueError("Invalid implementation: %s" % (implementation))

result = benchmark.pedantic(run_benchmark, iterations=5, rounds=5)
result = benchmark(run_benchmark)
assert result == data, "Output is not the same as the input"
Loading

0 comments on commit 78ae28e

Please sign in to comment.