Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow background tasks to be run on a separate worker. #8369

Merged
merged 20 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
71ade09
Allow background tasks to be run on a separate worker.
clokep Sep 21, 2020
094b11e
Only run function on background worker.
clokep Sep 22, 2020
fd8aad3
Accept a worker name instead of using a flag.
clokep Sep 22, 2020
67c6fa4
Remove check for being the background_worker app.
clokep Sep 22, 2020
08a7d5d
Do not allow a non-existent worker app.
clokep Sep 23, 2020
0d06a87
Also run the user directory updating in the backgronud.
clokep Sep 24, 2020
2e98b78
Ensure the proper handlers are loaded during start-up.
clokep Sep 24, 2020
b77b89b
Backout some of the changes to simplify the PR.
clokep Sep 25, 2020
f26b92e
Ensure that the background tasks have access to the proper stores.
clokep Sep 25, 2020
53a4402
Add some documentation.
clokep Sep 25, 2020
82d167b
Do not require a replication endpoint for the instance map.
clokep Sep 25, 2020
0c9e970
Clarify confusing wording.
clokep Sep 25, 2020
d40aff7
Do not require the background worker instance to be in the instance map.
clokep Sep 29, 2020
c015379
Update the sample config.
clokep Sep 29, 2020
cfe28f2
The user directory background tasks are controlled by a separate flag.
clokep Sep 29, 2020
f0c83d9
Rename instance variable.
clokep Sep 30, 2020
b226c49
Allow the phone home stats to be able to run on any worker.
clokep Sep 30, 2020
cb740cf
Move around some storage classes to make the proper metrics methods a…
clokep Sep 30, 2020
179d8c3
Consolidate logic for starting metrics calls.
clokep Sep 30, 2020
9fb2c05
Merge remote-tracking branch 'origin/develop' into clokep/background-…
clokep Sep 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8369.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow running background tasks in a separate worker process.
5 changes: 5 additions & 0 deletions docs/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2484,6 +2484,11 @@ opentracing:
# events: worker1
# typing: worker1

# The worker that is used to run background tasks (e.g. cleaning up expired
# data). If not provided this defaults to the main process.
#
#run_background_tasks_on: worker1


# Configuration for Redis when using workers. This *must* be enabled when
# using workers (unless using old style direct TCP configuration).
Expand Down
17 changes: 17 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,23 @@ stream_writers:
events: event_persister1
```

#### Background tasks

There is also *experimental* support for moving background tasks to a separate
worker. Background tasks are run periodically or started via replication. Exactly
which tasks are configured to run depends on your Synapse configuration (e.g. if
stats is enabled).

To enable this, the worker must have a `worker_name` and can be configured to run
background tasks. For example, to move background tasks to a dedicated worker,
the shared configuration would include:

```yaml
run_background_tasks_on: background_worker
```

You might also wish to investigate the `update_user_directory` and
`media_instance_running_background_jobs` settings.

### `synapse.app.pusher`

Expand Down
6 changes: 6 additions & 0 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import synapse
from synapse.app import check_bind_error
from synapse.app.phone_stats_home import start_phone_stats_home
from synapse.config.server import ListenerConfig
from synapse.crypto import context_factory
from synapse.logging.context import PreserveLoggingContext
Expand Down Expand Up @@ -274,6 +275,11 @@ def handle_sighup(*args, **kwargs):
setup_sentry(hs)
setup_sdnotify(hs)

# If background tasks are running on the main process, start collecting the
# phone home stats.
if hs.config.run_background_tasks:
start_phone_stats_home(hs)

# We now freeze all allocated objects in the hopes that (almost)
# everything currently allocated are things that will be used for the
# rest of time. Doing so means less work each GC (hopefully).
Expand Down
1 change: 1 addition & 0 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def start(config_options):

# Explicitly disable background processes
config.update_user_directory = False
config.run_background_tasks = False
config.start_pushers = False
config.send_federation = False

Expand Down
4 changes: 4 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,13 @@
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.databases.main.presence import UserPresenceState
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -454,6 +456,7 @@ class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
StatsStore,
UIAuthWorkerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
Expand All @@ -476,6 +479,7 @@ class GenericWorkerSlavedStore(
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
ServerMetricsStore,
SearchWorkerStore,
BaseSlavedStore,
):
Expand Down
182 changes: 0 additions & 182 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@

import gc
import logging
import math
import os
import resource
import sys
from typing import Iterable

from prometheus_client import Gauge

from twisted.application import service
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
Expand Down Expand Up @@ -60,7 +56,6 @@
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import ModuleApi
from synapse.python_dependencies import check_requirements
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
Expand Down Expand Up @@ -334,20 +329,6 @@ def start_listening(self, listeners: Iterable[ListenerConfig]):
logger.warning("Unrecognized listener type: %s", listener.type)


# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
current_mau_by_service_gauge = Gauge(
"synapse_admin_mau_current_mau_by_service",
"Current MAU by service",
["app_service"],
)
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
registered_reserved_users_mau_gauge = Gauge(
"synapse_admin_mau:registered_reserved_users",
"Registered users with reserved threepids",
)


def setup(config_options):
"""
Args:
Expand Down Expand Up @@ -389,8 +370,6 @@ def setup(config_options):
except UpgradeDatabaseException as e:
quit_with_error("Failed to upgrade database: %s" % (e,))

hs.setup_master()

async def do_acme() -> bool:
"""
Reprovision an ACME certificate, if it's required.
Expand Down Expand Up @@ -486,92 +465,6 @@ def stopService(self):
return self._port.stopListening()


# Contains the list of processes we will be monitoring
# currently either 0 or 1
_stats_process = []


async def phone_stats_home(hs, stats, stats_process=_stats_process):
logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
uptime = int(now - hs.start_time)
if uptime < 0:
uptime = 0

#
# Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test.
#
old = stats_process[0]
new = (now, resource.getrusage(resource.RUSAGE_SELF))
stats_process[0] = new

# Get RSS in bytes
stats["memory_rss"] = new[1].ru_maxrss

# Get CPU time in % of a single core, not % of all cores
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
old[1].ru_utime + old[1].ru_stime
)
if used_cpu_time == 0 or new[0] == old[0]:
stats["cpu_average"] = 0
else:
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)

#
# General statistics
#

stats["homeserver"] = hs.config.server_name
stats["server_context"] = hs.config.server_context
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
)
stats["total_users"] = await hs.get_datastore().count_all_users()

total_nonbridged_users = await hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users

daily_user_type_results = await hs.get_datastore().count_daily_user_type()
for name, count in daily_user_type_results.items():
stats["daily_user_type_" + name] = count

room_count = await hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count

stats["daily_active_users"] = await hs.get_datastore().count_daily_users()
stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users()
stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = await hs.get_datastore().count_daily_messages()

r30_results = await hs.get_datastore().count_r30_users()
for name, count in r30_results.items():
stats["r30_users_" + name] = count

daily_sent_messages = await hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = hs.config.caches.global_factor
stats["event_cache_size"] = hs.config.caches.event_cache_size

#
# Database version
#

# This only reports info about the *main* database.
stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__
stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version

logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
try:
await hs.get_proxied_http_client().put_json(
hs.config.report_stats_endpoint, stats
)
except Exception as e:
logger.warning("Error reporting stats: %s", e)


def run(hs):
PROFILE_SYNAPSE = False
if PROFILE_SYNAPSE:
Expand All @@ -597,81 +490,6 @@ def profiled(*args, **kargs):
ThreadPool._worker = profile(ThreadPool._worker)
reactor.run = profile(reactor.run)

clock = hs.get_clock()

stats = {}

def performance_stats_init():
_stats_process.clear()
_stats_process.append(
(int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
)

def start_phone_stats_home():
return run_as_background_process(
"phone_stats_home", phone_stats_home, hs, stats
)

def generate_user_daily_visit_stats():
return run_as_background_process(
"generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits
)

# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)

# monthly active user limiting functionality
def reap_monthly_active_users():
return run_as_background_process(
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
)

clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()

async def generate_monthly_active_users():
current_mau_count = 0
current_mau_count_by_service = {}
reserved_users = ()
store = hs.get_datastore()
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
current_mau_count = await store.get_monthly_active_count()
current_mau_count_by_service = (
await store.get_monthly_active_count_by_service()
)
reserved_users = await store.get_registered_reserved_users()
current_mau_gauge.set(float(current_mau_count))

for app_service, count in current_mau_count_by_service.items():
current_mau_by_service_gauge.labels(app_service).set(float(count))

registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
max_mau_gauge.set(float(hs.config.max_mau_value))

def start_generate_monthly_active_users():
return run_as_background_process(
"generate_monthly_active_users", generate_monthly_active_users
)

start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings

if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)

# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
clock.call_later(0, performance_stats_init)

# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home)

_base.start_reactor(
"synapse-homeserver",
soft_file_limit=hs.config.soft_file_limit,
Expand Down
Loading