Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow background tasks to be run on a separate worker. #8369

Merged
merged 20 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
71ade09
Allow background tasks to be run on a separate worker.
clokep Sep 21, 2020
094b11e
Only run function on background worker.
clokep Sep 22, 2020
fd8aad3
Accept a worker name instead of using a flag.
clokep Sep 22, 2020
67c6fa4
Remove check for being the background_worker app.
clokep Sep 22, 2020
08a7d5d
Do not allow a non-existent worker app.
clokep Sep 23, 2020
0d06a87
Also run the user directory updating in the backgronud.
clokep Sep 24, 2020
2e98b78
Ensure the proper handlers are loaded during start-up.
clokep Sep 24, 2020
b77b89b
Backout some of the changes to simplify the PR.
clokep Sep 25, 2020
f26b92e
Ensure that the background tasks have access to the proper stores.
clokep Sep 25, 2020
53a4402
Add some documentation.
clokep Sep 25, 2020
82d167b
Do not require a replication endpoint for the instance map.
clokep Sep 25, 2020
0c9e970
Clarify confusing wording.
clokep Sep 25, 2020
d40aff7
Do not require the background worker instance to be in the instance map.
clokep Sep 29, 2020
c015379
Update the sample config.
clokep Sep 29, 2020
cfe28f2
The user directory background tasks are controlled by a separate flag.
clokep Sep 29, 2020
f0c83d9
Rename instance variable.
clokep Sep 30, 2020
b226c49
Allow the phone home stats to be able to run on any worker.
clokep Sep 30, 2020
cb740cf
Move around some storage classes to make the proper metrics methods a…
clokep Sep 30, 2020
179d8c3
Consolidate logic for starting metrics calls.
clokep Sep 30, 2020
9fb2c05
Merge remote-tracking branch 'origin/develop' into clokep/background-…
clokep Sep 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8369.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow running background tasks in a separate worker process.
6 changes: 6 additions & 0 deletions docs/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2458,6 +2458,12 @@ opentracing:
# events: worker1
# typing: worker1

# The worker that is used to run background tasks (e.g. cleaning up expired
# data). This should be one of the workers from `instance_map`. If not
# provided this defaults to the main process.
#
#run_background_tasks: worker1


# Configuration for Redis when using workers. This *must* be enabled when
# using workers (unless using old style direct TCP configuration).
Expand Down
17 changes: 17 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,23 @@ stream_writers:
events: event_persister1
```

#### Background tasks

There is also *experimental* support for moving background tasks to a separate
worker. Background tasks are run periodically or started via replication. Exactly
which tasks are configured to run depends on your Synapse configuration (e.g. if
stats is enabled).

To enable this, the worker must have a `worker_name` and be listed in the
`instance_map` config. For example, to move background tasks to a dedicated
worker, the shared configuration would include:

```yaml
instance_map:
background_worker: null

run_background_tasks: background_worker
```

### `synapse.app.pusher`

Expand Down
2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
)
from synapse.storage.databases.main.presence import UserPresenceState
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -454,6 +455,7 @@ class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
StatsStore,
UIAuthWorkerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
Expand Down
18 changes: 10 additions & 8 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,6 @@ def setup(config_options):
except UpgradeDatabaseException as e:
quit_with_error("Failed to upgrade database: %s" % (e,))

hs.setup_master()

async def do_acme() -> bool:
"""
Reprovision an ACME certificate, if it's required.
Expand Down Expand Up @@ -620,16 +618,18 @@ def generate_user_daily_visit_stats():
# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
if hs.config.run_background_tasks:
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)

# monthly active user limiting functionality
def reap_monthly_active_users():
return run_as_background_process(
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
)

clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()
if hs.config.run_background_tasks:
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()
clokep marked this conversation as resolved.
Show resolved Hide resolved

async def generate_monthly_active_users():
current_mau_count = 0
Expand All @@ -655,12 +655,14 @@ def start_generate_monthly_active_users():
"generate_monthly_active_users", generate_monthly_active_users
)

start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
if hs.config.run_background_tasks and (
hs.config.limit_usage_by_mau or hs.config.mau_stats_only
):
start_generate_monthly_active_users()
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings

if hs.config.report_stats:
if hs.config.run_background_tasks and hs.config.report_stats:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
clokep marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)

Expand Down
30 changes: 28 additions & 2 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ def read_config(self, config, **kwargs):
federation_sender_instances
)

# A map from instance name to host/port of their HTTP replication endpoint.
# A map from instance name to host/port of their HTTP replication endpoint
# (or None if there's no HTTP replication endpoint).
instance_map = config.get("instance_map") or {}
self.instance_map = {
name: InstanceLocationConfig(**c) for name, c in instance_map.items()
name: InstanceLocationConfig(**c) if c else None
for name, c in instance_map.items()
}
clokep marked this conversation as resolved.
Show resolved Hide resolved

# Map from type of streams to source, c.f. WriterLocations.
Expand All @@ -132,6 +134,24 @@ def read_config(self, config, **kwargs):

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

# Whether this worker should run background tasks or not.
#
# As a note for developers, the background tasks guarded by this should
# be able to run on only a single instance (meaning that they don't
# depend on any in-memory state of a particular worker).
#
# Effort is not made to ensure only a single instance of these tasks is
# running.
instance = config.get("run_background_tasks_on") or "master"
clokep marked this conversation as resolved.
Show resolved Hide resolved
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to run background tasks but does not appear in `instance_map` config."
% (instance,)
)
self.run_background_tasks = (
self.worker_name is None and instance == "master"
) or self.worker_name == instance

def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
## Workers ##
Expand Down Expand Up @@ -167,6 +187,12 @@ def generate_config_section(self, config_dir_path, server_name, **kwargs):
#stream_writers:
# events: worker1
# typing: worker1

# The worker that is used to run background tasks (e.g. cleaning up expired
# data). This should be one of the workers from `instance_map`. If not
# provided this defaults to the main process.
#
#run_background_tasks: worker1
"""

def read_arguments(self, args):
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def __init__(self, hs):
self._clock = self.hs.get_clock()

# Expire old UI auth sessions after a period of time.
if hs.config.worker_app is None:
if hs.config.run_background_tasks:
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, hs):
# Guard to ensure we only process deltas one at a time
self._is_processing = False

if hs.config.stats_enabled:
if self.stats_enabled and hs.config.run_background_tasks:
self.notifier.add_replication_callback(self.notify_new_event)

# We kick this off so that we don't have to wait for a change before
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, hs):
# Guard to ensure we only process deltas one at a time
self._is_processing = False

if self.update_user_directory:
if hs.config.run_background_tasks and self.update_user_directory:
self.notifier.add_replication_callback(self.notify_new_event)

# We kick this off so that we don't have to wait for a change before
Expand Down
15 changes: 11 additions & 4 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ class HomeServer(metaclass=abc.ABCMeta):
we are listening on to provide HTTP services.
"""

REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
REQUIRED_ON_BACKGROUND_TASK_STARTUP = [
"auth",
"stats",
"user_directory",
]

# This is overridden in derived application classes
# (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be
Expand Down Expand Up @@ -251,14 +255,17 @@ def setup(self) -> None:
self.datastores = Databases(self.DATASTORE_CLASS, self)
logger.info("Finished setting up.")

def setup_master(self) -> None:
if self.config.run_background_tasks:
self.setup_background_tasks()

def setup_background_tasks(self) -> None:
"""
Some handlers have side effects on instantiation (like registering
background updates). This function causes them to be fetched, and
therefore instantiated, to run those side effects.
"""
for i in self.REQUIRED_ON_MASTER_STARTUP:
getattr(self, "get_" + i)()
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
getattr(self, "get_" + i + "_handler")()

def get_reactor(self) -> twisted.internet.base.ReactorBase:
"""
Expand Down
6 changes: 4 additions & 2 deletions synapse/storage/databases/main/ui_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,6 @@ async def get_user_agents_ips_to_ui_auth_session(
)
return [(row["user_agent"], row["ip"]) for row in rows]


class UIAuthStore(UIAuthWorkerStore):
async def delete_old_ui_auth_sessions(self, expiration_time: int) -> None:
"""
Remove sessions which were last used earlier than the expiration time.
Expand Down Expand Up @@ -339,3 +337,7 @@ def _delete_old_ui_auth_sessions_txn(
iterable=session_ids,
keyvalues={},
)


class UIAuthStore(UIAuthWorkerStore):
pass
Comment on lines +342 to +343
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to remove the UIAuthWorkerStore and just use UIAuthStore everywhere instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, though its not the worse to keep the separation for consistency.

2 changes: 1 addition & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def setup_test_homeserver(

hs.setup()
if homeserverToUse.__name__ == "TestHomeServer":
hs.setup_master()
hs.setup_background_tasks()

if isinstance(db_engine, PostgresEngine):
database = hs.get_datastores().databases[0]
Expand Down