diff --git a/changelog.d/8369.feature b/changelog.d/8369.feature new file mode 100644 index 000000000000..542993110bc8 --- /dev/null +++ b/changelog.d/8369.feature @@ -0,0 +1 @@ +Allow running background tasks in a separate worker process. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index c38413c8937b..9ac567607ff2 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -885,6 +885,7 @@ def start(config_options): # For backwards compatibility let any of the old app names. assert config.worker_app in ( "synapse.app.appservice", + "synapse.app.background_worker", "synapse.app.client_reader", "synapse.app.event_creator", "synapse.app.federation_reader", @@ -961,6 +962,22 @@ def start(config_options): # For other worker types we force this to off. config.worker.send_federation = False + if config.worker_app == "synapse.app.background_worker": + if config.worker.run_background_tasks: + sys.stderr.write( + "\nThe run_background_tasks must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``run_background_tasks: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the background tasks to start since they will be disabled in the main config + config.worker.run_background_tasks = True + else: + # For other worker types we force this to off. + config.worker.run_background_tasks = False + synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts hs = GenericWorkerServer( diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index dff739e1062d..9ef438bec529 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -620,7 +620,8 @@ def generate_user_daily_visit_stats(): # Rather than update on per session basis, batch up the requests. # If you increase the loop period, the accuracy of user_daily_visits # table will decrease - clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) + if hs.config.run_background_tasks: + clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) # monthly active user limiting functionality def reap_monthly_active_users(): @@ -628,8 +629,9 @@ def reap_monthly_active_users(): "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users ) - clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) - reap_monthly_active_users() + if hs.config.run_background_tasks: + clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) + reap_monthly_active_users() async def generate_monthly_active_users(): current_mau_count = 0 @@ -656,11 +658,13 @@ def start_generate_monthly_active_users(): ) start_generate_monthly_active_users() - if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: + if hs.config.run_background_tasks and ( + hs.config.limit_usage_by_mau or hs.config.mau_stats_only + ): clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) # End of monthly active user settings - if hs.config.report_stats: + if hs.config.run_background_tasks and hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index f23e42cdf98c..3df07c83bd9d 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -132,6 +132,16 @@ def read_config(self, config, **kwargs): self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) + # Whether this worker should run background tasks or not. + # + # As a note for developers, the background tasks guarded by this should + # be able to run on only a single instance (meaning that they don't + # depend on any in-memory state of a particular worker). + # + # Effort is not made to ensure only a single instance of these tasks is + # running. + self.run_background_tasks = config.get("run_background_tasks", True) + def generate_config_section(self, config_dir_path, server_name, **kwargs): return """\ ## Workers ## diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index 4caf6d591af5..abab471f5d5d 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -42,6 +42,7 @@ def __init__(self, hs): if ( self._account_validity.enabled and self._account_validity.renew_by_email_enabled + and hs.config.run_background_tasks ): # Don't do email-specific configuration if renewal by email is disabled. self._template_html = self.config.account_validity_template_html diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 0322b60cfc63..698a2495c2d3 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -203,7 +203,7 @@ def __init__(self, hs): self._clock = self.hs.get_clock() # Expire old UI auth sessions after a period of time. - if hs.config.worker_app is None: + if hs.config.run_background_tasks: self._clock.looping_call( run_as_background_process, 5 * 60 * 1000, diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 0635ad570866..8c29c4e97298 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -42,7 +42,7 @@ def __init__(self, hs): # Start the user parter loop so it can resume parting users from rooms where # it left off (if it has work left to do). - if hs.config.worker_app is None: + if hs.config.run_background_tasks: hs.get_reactor().callWhenRunning(self._start_user_parting) self._account_validity_enabled = hs.config.account_validity.enabled diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 55a978743988..19fd6bb0c799 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -522,12 +522,13 @@ def __init__(self, hs, device_handler): # Attempt to resync out of sync device lists every 30s. self._resync_retry_in_progress = False - self.clock.looping_call( - run_as_background_process, - 30 * 1000, - func=self._maybe_retry_device_resync, - desc="_maybe_retry_device_resync", - ) + if hs.config.run_background_tasks: + self.clock.looping_call( + run_as_background_process, + 30 * 1000, + func=self._maybe_retry_device_resync, + desc="_maybe_retry_device_resync", + ) @trace async def incoming_device_list_update(self, origin, edu_content): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a8fe5cf4e2eb..bac0a056fd51 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -413,7 +413,7 @@ def __init__(self, hs: "HomeServer"): self._consent_uri_builder = ConsentURIBuilder(self.config) if ( - not self.config.worker_app + self.config.run_background_tasks and self.config.cleanup_extremities_with_dummy_events ): self.clock.looping_call( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index a0b3bdb5e0c3..563584a4bbc2 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -92,7 +92,7 @@ def __init__(self, hs: "HomeServer"): self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max - if hs.config.retention_enabled: + if hs.config.run_background_tasks and hs.config.retention_enabled: # Run the purge jobs described in the configuration file. for job in hs.config.retention_purge_jobs: logger.info("Setting up purge job with config: %s", job) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 5453e6dfc87a..55aa1e017ddc 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -371,11 +371,10 @@ class MasterProfileHandler(BaseProfileHandler): def __init__(self, hs): super().__init__(hs) - assert hs.config.worker_app is None - - self.clock.looping_call( - self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS - ) + if hs.config.run_background_tasks: + self.clock.looping_call( + self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS + ) def _start_update_remote_profile_cache(self): return run_as_background_process( diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 249ffe2a55c8..dc62b21c06f9 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -49,7 +49,7 @@ def __init__(self, hs): # Guard to ensure we only process deltas one at a time self._is_processing = False - if hs.config.stats_enabled: + if self.stats_enabled and hs.config.run_background_tasks: self.notifier.add_replication_callback(self.notify_new_event) # We kick this off so that we don't have to wait for a change before diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index f211ddbaf88e..5f7da9d4f1d7 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -40,7 +40,10 @@ def _censor_redactions(): "_censor_redactions", self._censor_redactions ) - if self.hs.config.redaction_retention_period is not None: + if ( + self.hs.config.run_background_tasks + and self.hs.config.redaction_retention_period is not None + ): hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) async def _censor_redactions(self): diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 239c7a949cba..1ae25dff0b1e 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -372,7 +372,7 @@ def __init__(self, database: DatabasePool, db_conn, hs): "before", "shutdown", self._update_client_ips_batch ) - if self.user_ips_max_age: + if hs.config.run_background_tasks and self.user_ips_max_age: self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) async def insert_client_ip( diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index c04374e43d11..41f5026acb91 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -834,7 +834,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): name="device_id_exists", keylen=2, max_entries=10000 ) - self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) + if hs.config.run_background_tasks: + self._clock.looping_call( + self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + ) async def store_device( self, user_id: str, device_id: str, initial_device_display_name: str diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 6d3689c09e59..b5f6e8ac6042 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -606,9 +606,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth ) - hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 - ) + if hs.config.run_background_tasks: + hs.get_clock().looping_call( + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + ) def _delete_old_forward_extrem_cache(self): def _delete_old_forward_extrem_cache_txn(txn): diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 62f1738732f3..e9515fa24573 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -679,9 +679,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): ) self._doing_notif_rotation = False - self._rotate_notif_loop = self._clock.looping_call( - self._start_rotate_notifs, 30 * 60 * 1000 - ) + if hs.config.run_background_tasks: + self._rotate_notif_loop = self._clock.looping_call( + self._start_rotate_notifs, 30 * 60 * 1000 + ) async def get_push_actions_for_user( self, user_id, before=None, limit=50, only_highlight=False @@ -741,7 +742,7 @@ def _remove_old_push_actions_before_txn( users can still get a list of recent highlights. Args: - txn: The transcation + txn: The transaction room_id: Room ID to delete from user_id: user ID to delete for stream_ordering: The lowest stream ordering which will diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 686052bd83c0..f90dded8c44a 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -52,7 +52,8 @@ def read_forward_extremities(): "read_forward_extremities", self._read_forward_extremities ) - hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + if hs.config.run_background_tasks: + hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) async def _read_forward_extremities(self): def fetch(txn): diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 675e81fe3436..ca8599d7de59 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -914,7 +914,8 @@ def start_cull(): self.cull_expired_threepid_validation_tokens, ) - hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) + if hs.config.run_background_tasks: + hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) async def add_access_token_to_user( self, diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 4fa8767b012e..6716905c249b 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -69,7 +69,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._check_safe_current_state_events_membership_updated_txn(txn) txn.close() - if self.hs.config.metrics_flags.known_servers: + if ( + self.hs.config.run_background_tasks + and self.hs.config.metrics_flags.known_servers + ): self._known_servers_count = 1 self.hs.get_clock().looping_call( run_as_background_process, diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 97aed1500e3e..31044e891d05 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -50,7 +50,8 @@ class TransactionStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) - self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) + if hs.config.run_background_tasks: + self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) self._destination_retry_cache = ExpiringCache( cache_name="get_destination_retry_timings",