Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Allow background tasks to be run on a separate worker.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Sep 21, 2020
1 parent 4f3096d commit 71ade09
Show file tree
Hide file tree
Showing 21 changed files with 82 additions and 35 deletions.
1 change: 1 addition & 0 deletions changelog.d/8369.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow running background tasks in a separate worker process.
17 changes: 17 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ def start(config_options):
# For backwards compatibility let any of the old app names.
assert config.worker_app in (
"synapse.app.appservice",
"synapse.app.background_worker",
"synapse.app.client_reader",
"synapse.app.event_creator",
"synapse.app.federation_reader",
Expand Down Expand Up @@ -961,6 +962,22 @@ def start(config_options):
# For other worker types we force this to off.
config.worker.send_federation = False

if config.worker_app == "synapse.app.background_worker":
if config.worker.run_background_tasks:
sys.stderr.write(
"\nThe run_background_tasks must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``run_background_tasks: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the background tasks to start since they will be disabled in the main config
config.worker.run_background_tasks = True
else:
# For other worker types we force this to off.
config.worker.run_background_tasks = False

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

hs = GenericWorkerServer(
Expand Down
14 changes: 9 additions & 5 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,16 +620,18 @@ def generate_user_daily_visit_stats():
# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
if hs.config.run_background_tasks:
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)

# monthly active user limiting functionality
def reap_monthly_active_users():
return run_as_background_process(
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
)

clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()
if hs.config.run_background_tasks:
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()

async def generate_monthly_active_users():
current_mau_count = 0
Expand All @@ -656,11 +658,13 @@ def start_generate_monthly_active_users():
)

start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
if hs.config.run_background_tasks and (
hs.config.limit_usage_by_mau or hs.config.mau_stats_only
):
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings

if hs.config.report_stats:
if hs.config.run_background_tasks and hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)

Expand Down
10 changes: 10 additions & 0 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ def read_config(self, config, **kwargs):

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

# Whether this worker should run background tasks or not.
#
# As a note for developers, the background tasks guarded by this should
# be able to run on only a single instance (meaning that they don't
# depend on any in-memory state of a particular worker).
#
# Effort is not made to ensure only a single instance of these tasks is
# running.
self.run_background_tasks = config.get("run_background_tasks", True)

def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
## Workers ##
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, hs):
if (
self._account_validity.enabled
and self._account_validity.renew_by_email_enabled
and hs.config.run_background_tasks
):
# Don't do email-specific configuration if renewal by email is disabled.
self._template_html = self.config.account_validity_template_html
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def __init__(self, hs):
self._clock = self.hs.get_clock()

# Expire old UI auth sessions after a period of time.
if hs.config.worker_app is None:
if hs.config.run_background_tasks:
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, hs):

# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
if hs.config.worker_app is None:
if hs.config.run_background_tasks:
hs.get_reactor().callWhenRunning(self._start_user_parting)

self._account_validity_enabled = hs.config.account_validity.enabled
Expand Down
13 changes: 7 additions & 6 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,12 +522,13 @@ def __init__(self, hs, device_handler):

# Attempt to resync out of sync device lists every 30s.
self._resync_retry_in_progress = False
self.clock.looping_call(
run_as_background_process,
30 * 1000,
func=self._maybe_retry_device_resync,
desc="_maybe_retry_device_resync",
)
if hs.config.run_background_tasks:
self.clock.looping_call(
run_as_background_process,
30 * 1000,
func=self._maybe_retry_device_resync,
desc="_maybe_retry_device_resync",
)

@trace
async def incoming_device_list_update(self, origin, edu_content):
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def __init__(self, hs: "HomeServer"):
self._consent_uri_builder = ConsentURIBuilder(self.config)

if (
not self.config.worker_app
self.config.run_background_tasks
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __init__(self, hs: "HomeServer"):
self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min
self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max

if hs.config.retention_enabled:
if hs.config.run_background_tasks and hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
logger.info("Setting up purge job with config: %s", job)
Expand Down
9 changes: 4 additions & 5 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,10 @@ class MasterProfileHandler(BaseProfileHandler):
def __init__(self, hs):
super().__init__(hs)

assert hs.config.worker_app is None

self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
)
if hs.config.run_background_tasks:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
)

def _start_update_remote_profile_cache(self):
return run_as_background_process(
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, hs):
# Guard to ensure we only process deltas one at a time
self._is_processing = False

if hs.config.stats_enabled:
if self.stats_enabled and hs.config.run_background_tasks:
self.notifier.add_replication_callback(self.notify_new_event)

# We kick this off so that we don't have to wait for a change before
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/censor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def _censor_redactions():
"_censor_redactions", self._censor_redactions
)

if self.hs.config.redaction_retention_period is not None:
if (
self.hs.config.run_background_tasks
and self.hs.config.redaction_retention_period is not None
):
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)

async def _censor_redactions(self):
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
if hs.config.run_background_tasks and self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

async def insert_client_ip(
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
name="device_id_exists", keylen=2, max_entries=10000
)

self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
if hs.config.run_background_tasks:
self._clock.looping_call(
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
)

async def store_device(
self, user_id: str, device_id: str, initial_device_display_name: str
Expand Down
7 changes: 4 additions & 3 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
)

hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)
if hs.config.run_background_tasks:
hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)

def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
Expand Down
9 changes: 5 additions & 4 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
)

self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._start_rotate_notifs, 30 * 60 * 1000
)
if hs.config.run_background_tasks:
self._rotate_notif_loop = self._clock.looping_call(
self._start_rotate_notifs, 30 * 60 * 1000
)

async def get_push_actions_for_user(
self, user_id, before=None, limit=50, only_highlight=False
Expand Down Expand Up @@ -741,7 +742,7 @@ def _remove_old_push_actions_before_txn(
users can still get a list of recent highlights.
Args:
txn: The transcation
txn: The transaction
room_id: Room ID to delete from
user_id: user ID to delete for
stream_ordering: The lowest stream ordering which will
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def read_forward_extremities():
"read_forward_extremities", self._read_forward_extremities
)

hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
if hs.config.run_background_tasks:
hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)

async def _read_forward_extremities(self):
def fetch(txn):
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,8 @@ def start_cull():
self.cull_expired_threepid_validation_tokens,
)

hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
if hs.config.run_background_tasks:
hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)

async def add_access_token_to_user(
self,
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()

if self.hs.config.metrics_flags.known_servers:
if (
self.hs.config.run_background_tasks
and self.hs.config.metrics_flags.known_servers
):
self._known_servers_count = 1
self.hs.get_clock().looping_call(
run_as_background_process,
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class TransactionStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
if hs.config.run_background_tasks:
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
Expand Down

0 comments on commit 71ade09

Please sign in to comment.