diff --git a/.ci/scripts/calculate_jobs.py b/.ci/scripts/calculate_jobs.py index c2198e0dd49b..c8741e68489b 100755 --- a/.ci/scripts/calculate_jobs.py +++ b/.ci/scripts/calculate_jobs.py @@ -133,3 +133,73 @@ def set_output(key: str, value: str): test_matrix = json.dumps(sytest_tests) set_output("sytest_test_matrix", test_matrix) + + +# Calculate a comprehensive list of workers by type to hunt for specific problems with them. +# Won't need to include if it's a 'worker' setup, because obviously it is. Postgres is implied +# because it's necessary for worker at this time. + +complement_single_worker_tests = [ + { + "worker_types": workers, + } + for workers in ( + "account_data", + "appservice", + "background_worker", + "event_creator", + "event_persister", + "federation_inbound", + "federation_reader", + "federation_sender", + "frontend_proxy", + "media_repository", + "presence", + "pusher", + "receipts", + "synchrotron", + "to_device", + "typing", + "user_dir", + ) +] + +complement_sharding_worker_tests = [ + {"worker_types": "event_persister, event_persister, event_persister"}, + {"worker_types": "federation_sender, federation_sender, federation_sender"}, + {"worker_types": "pusher, pusher, pusher"}, + {"worker_types": "synchrotron, synchrotron, synchrotron"}, +] + +complement_stream_writers_worker_tests = [ + { + "worker_types": "account_data, event_persister, presence, receipts, to_device, typing" + } +] + +complement_fullset_worker_tests = [ + { + "worker_types": "account_data, appservice, background_worker, event_creator, event_persister, event_persister, federation_inbound, federation_reader, federation_sender, federation_sender, frontend_proxy, media_repository, pusher, pusher, synchrotron, to_device, typing, user_dir" + } +] + +print("::group::Calculated Complement jobs") +print( + json.dumps( + complement_single_worker_tests + + complement_sharding_worker_tests + + complement_stream_writers_worker_tests + + complement_fullset_worker_tests, + indent=4, + ) +) +print("::endgroup::") + +test_matrix = json.dumps(complement_single_worker_tests) +set_output("complement_singles_test_matrix", test_matrix) +test_matrix = json.dumps(complement_sharding_worker_tests) +set_output("complement_sharding_test_matrix", test_matrix) +test_matrix = json.dumps(complement_stream_writers_worker_tests) +set_output("complement_stream_writers_test_matrix", test_matrix) +test_matrix = json.dumps(complement_fullset_worker_tests) +set_output("complement_fullset_test_matrix", test_matrix) diff --git a/.github/workflows/complement.yml b/.github/workflows/complement.yml new file mode 100644 index 000000000000..722c568636d4 --- /dev/null +++ b/.github/workflows/complement.yml @@ -0,0 +1,161 @@ +name: Complement Tests + +on: +# push: +# branches: ["develop", "release-*"] +# pull_request: +# schedule: +# eg. runs everyday at 4am - 0 4 * * * +# eg. runs every sundat at 4am - 0 4 * * 0 + +# Note: this will only work if this test exists on default branch, which is 'develop' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + calculate-test-jobs: + name: "Calculate Test Jobs" + if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + - id: get-matrix + run: .ci/scripts/calculate_jobs.py + outputs: + complement_singles_test_matrix: ${{ steps.get-matrix.outputs.complement_singles_test_matrix }} + complement_sharding_test_matrix: ${{ steps.get-matrix.outputs.complement_sharding_test_matrix }} + complement_stream_writers_test_matrix: ${{ steps.get-matrix.outputs.complement_stream_writers_test_matrix }} + + singles: + name: Singles + if: "${{ !failure() && !cancelled() }}" + needs: calculate-test-jobs + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + job: ${{ fromJson(needs.calculate-test-jobs.outputs.complement_singles_test_matrix) }} + + steps: + - name: Run actions/checkout@v3 for synapse + uses: actions/checkout@v3 + with: + path: synapse + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: 1.58.1 + override: true + - uses: Swatinem/rust-cache@v2 + + - name: Prepare Complement's Prerequisites + run: synapse/.ci/scripts/setup_complement_prerequisites.sh + + - name: Run Complement Tests + shell: bash + env: + POSTGRES: "true" + WORKERS: "true" + SYNAPSE_WORKER_TYPES: ${{ matrix.job.worker_types }} + run: | + set -o pipefail + COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | synapse/.ci/scripts/gotestfmt + + + sharding: + name: Sharding + if: "${{ !failure() && !cancelled() }}" + needs: calculate-test-jobs + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + job: ${{ fromJson(needs.calculate-test-jobs.outputs.complement_sharding_test_matrix) }} + + steps: + - name: Run actions/checkout@v3 for synapse + uses: actions/checkout@v3 + with: + path: synapse + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: 1.58.1 + override: true + - uses: Swatinem/rust-cache@v2 + + - name: Prepare Complement's Prerequisites + run: synapse/.ci/scripts/setup_complement_prerequisites.sh + + - name: Run Complement Tests + shell: bash + env: + POSTGRES: "true" + WORKERS: "true" + SYNAPSE_WORKER_TYPES: ${{ matrix.job.worker_types }} + run: | + set -o pipefail + COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | synapse/.ci/scripts/gotestfmt + + + stream_writers: + name: All Stream Writers + if: "${{ !failure() && !cancelled() }}" + needs: calculate-test-jobs + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + job: ${{ fromJson(needs.calculate-test-jobs.outputs.complement_stream_writers_test_matrix) }} + + steps: + - name: Run actions/checkout@v3 for synapse + uses: actions/checkout@v3 + with: + path: synapse + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: 1.58.1 + override: true + - uses: Swatinem/rust-cache@v2 + + - name: Prepare Complement's Prerequisites + run: synapse/.ci/scripts/setup_complement_prerequisites.sh + + - name: Run Complement Tests + shell: bash + env: + POSTGRES: "true" + WORKERS: "true" + SYNAPSE_WORKER_TYPES: ${{ matrix.job.worker_types }} + run: | + set -o pipefail + COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | synapse/.ci/scripts/gotestfmt + + + # a job which marks all the other jobs as complete, thus allowing PRs to be merged. + tests-done: + name: Tests Done + if: ${{ always() }} + needs: + - singles + - sharding + - stream_writers + runs-on: ubuntu-latest + steps: + - uses: matrix-org/done-action@v2 + with: + needs: ${{ toJSON(needs) }} + + # skippable: diff --git a/changelog.d/14197.docker b/changelog.d/14197.docker new file mode 100644 index 000000000000..529ccd99c501 --- /dev/null +++ b/changelog.d/14197.docker @@ -0,0 +1 @@ +Add all Stream Writer worker types to configure_workers_and_start.py. diff --git a/changelog.d/14202.misc b/changelog.d/14202.misc new file mode 100644 index 000000000000..c010f7bab81d --- /dev/null +++ b/changelog.d/14202.misc @@ -0,0 +1 @@ +Extend Complement testing to include various worker type combinations. diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 1ea456b2f88e..8f981399b309 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -50,7 +50,13 @@ MAIN_PROCESS_HTTP_LISTENER_PORT = 8080 - +# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources +# Watching /_matrix/client needs a "client" listener +# Watching /_matrix/federation needs a "federation" listener +# Watching /_matrix/media and related needs a "media" listener +# Stream Writers require "client" and "replication" listeners because they +# have to attach by instance_map to the master process and have client endpoints. +# BIG WARNING: typing and receipts stream writers are not working correctly at this time. WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "pusher": { "app": "synapse.app.pusher", @@ -209,6 +215,49 @@ % (MAIN_PROCESS_HTTP_LISTENER_PORT,) ), }, + "account_data": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": [ + "^/_matrix/client/(r0|v3|unstable)/.*/tags", + "^/_matrix/client/(r0|v3|unstable)/.*/account_data", + ], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, + "presence": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, + "receipts": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": [ + "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", + "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", + ], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, + "to_device": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, + "typing": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": [ + "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" + ], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, } # Templates for sections that may be inserted multiple times in config files @@ -309,6 +358,20 @@ def add_sharding_to_shared_config( "port": worker_port, } + elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]: + # Update the list of stream writers + # It's convienent that the name of the worker type is the same as the event stream + shared_config.setdefault("stream_writers", {}).setdefault( + worker_type, [] + ).append(worker_name) + + # Map of stream writer instance names to host/ports combos + # For now, all stream writers need http replication ports + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } + elif worker_type == "media_repository": # The first configured media worker will run the media background jobs shared_config.setdefault("media_instance_running_background_jobs", worker_name) @@ -441,11 +504,11 @@ def generate_worker_files( # Check if more than one instance of this worker type has been specified worker_type_total_count = worker_types.count(worker_type) - if worker_type_total_count > 1: - # Update the shared config with sharding-related options if necessary - add_sharding_to_shared_config( - shared_config, worker_type, worker_name, worker_port - ) + + # Update the shared config with sharding-related options if necessary + add_sharding_to_shared_config( + shared_config, worker_type, worker_name, worker_port + ) # Enable the worker in supervisord worker_descriptors.append(worker_config)