From 71ade0999bc1975dd2de2e0e1d7473ea6838b06a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 21 Sep 2020 13:41:27 -0400 Subject: [PATCH 01/19] Allow background tasks to be run on a separate worker. --- changelog.d/8369.feature | 1 + synapse/app/generic_worker.py | 17 +++++++++++++++++ synapse/app/homeserver.py | 14 +++++++++----- synapse/config/workers.py | 10 ++++++++++ synapse/handlers/account_validity.py | 1 + synapse/handlers/auth.py | 2 +- synapse/handlers/deactivate_account.py | 2 +- synapse/handlers/device.py | 13 +++++++------ synapse/handlers/message.py | 2 +- synapse/handlers/pagination.py | 2 +- synapse/handlers/profile.py | 9 ++++----- synapse/handlers/stats.py | 2 +- synapse/storage/databases/main/censor_events.py | 5 ++++- synapse/storage/databases/main/client_ips.py | 2 +- synapse/storage/databases/main/devices.py | 5 ++++- .../storage/databases/main/event_federation.py | 7 ++++--- .../databases/main/event_push_actions.py | 9 +++++---- synapse/storage/databases/main/metrics.py | 3 ++- synapse/storage/databases/main/registration.py | 3 ++- synapse/storage/databases/main/roommember.py | 5 ++++- synapse/storage/databases/main/transactions.py | 3 ++- 21 files changed, 82 insertions(+), 35 deletions(-) create mode 100644 changelog.d/8369.feature diff --git a/changelog.d/8369.feature b/changelog.d/8369.feature new file mode 100644 index 000000000000..542993110bc8 --- /dev/null +++ b/changelog.d/8369.feature @@ -0,0 +1 @@ +Allow running background tasks in a separate worker process. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index c38413c8937b..9ac567607ff2 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -885,6 +885,7 @@ def start(config_options): # For backwards compatibility let any of the old app names. assert config.worker_app in ( "synapse.app.appservice", + "synapse.app.background_worker", "synapse.app.client_reader", "synapse.app.event_creator", "synapse.app.federation_reader", @@ -961,6 +962,22 @@ def start(config_options): # For other worker types we force this to off. config.worker.send_federation = False + if config.worker_app == "synapse.app.background_worker": + if config.worker.run_background_tasks: + sys.stderr.write( + "\nThe run_background_tasks must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``run_background_tasks: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the background tasks to start since they will be disabled in the main config + config.worker.run_background_tasks = True + else: + # For other worker types we force this to off. + config.worker.run_background_tasks = False + synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts hs = GenericWorkerServer( diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index dff739e1062d..9ef438bec529 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -620,7 +620,8 @@ def generate_user_daily_visit_stats(): # Rather than update on per session basis, batch up the requests. # If you increase the loop period, the accuracy of user_daily_visits # table will decrease - clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) + if hs.config.run_background_tasks: + clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) # monthly active user limiting functionality def reap_monthly_active_users(): @@ -628,8 +629,9 @@ def reap_monthly_active_users(): "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users ) - clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) - reap_monthly_active_users() + if hs.config.run_background_tasks: + clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) + reap_monthly_active_users() async def generate_monthly_active_users(): current_mau_count = 0 @@ -656,11 +658,13 @@ def start_generate_monthly_active_users(): ) start_generate_monthly_active_users() - if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: + if hs.config.run_background_tasks and ( + hs.config.limit_usage_by_mau or hs.config.mau_stats_only + ): clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) # End of monthly active user settings - if hs.config.report_stats: + if hs.config.run_background_tasks and hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index f23e42cdf98c..3df07c83bd9d 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -132,6 +132,16 @@ def read_config(self, config, **kwargs): self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) + # Whether this worker should run background tasks or not. + # + # As a note for developers, the background tasks guarded by this should + # be able to run on only a single instance (meaning that they don't + # depend on any in-memory state of a particular worker). + # + # Effort is not made to ensure only a single instance of these tasks is + # running. + self.run_background_tasks = config.get("run_background_tasks", True) + def generate_config_section(self, config_dir_path, server_name, **kwargs): return """\ ## Workers ## diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index 4caf6d591af5..abab471f5d5d 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -42,6 +42,7 @@ def __init__(self, hs): if ( self._account_validity.enabled and self._account_validity.renew_by_email_enabled + and hs.config.run_background_tasks ): # Don't do email-specific configuration if renewal by email is disabled. self._template_html = self.config.account_validity_template_html diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 0322b60cfc63..698a2495c2d3 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -203,7 +203,7 @@ def __init__(self, hs): self._clock = self.hs.get_clock() # Expire old UI auth sessions after a period of time. - if hs.config.worker_app is None: + if hs.config.run_background_tasks: self._clock.looping_call( run_as_background_process, 5 * 60 * 1000, diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 0635ad570866..8c29c4e97298 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -42,7 +42,7 @@ def __init__(self, hs): # Start the user parter loop so it can resume parting users from rooms where # it left off (if it has work left to do). - if hs.config.worker_app is None: + if hs.config.run_background_tasks: hs.get_reactor().callWhenRunning(self._start_user_parting) self._account_validity_enabled = hs.config.account_validity.enabled diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 55a978743988..19fd6bb0c799 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -522,12 +522,13 @@ def __init__(self, hs, device_handler): # Attempt to resync out of sync device lists every 30s. self._resync_retry_in_progress = False - self.clock.looping_call( - run_as_background_process, - 30 * 1000, - func=self._maybe_retry_device_resync, - desc="_maybe_retry_device_resync", - ) + if hs.config.run_background_tasks: + self.clock.looping_call( + run_as_background_process, + 30 * 1000, + func=self._maybe_retry_device_resync, + desc="_maybe_retry_device_resync", + ) @trace async def incoming_device_list_update(self, origin, edu_content): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a8fe5cf4e2eb..bac0a056fd51 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -413,7 +413,7 @@ def __init__(self, hs: "HomeServer"): self._consent_uri_builder = ConsentURIBuilder(self.config) if ( - not self.config.worker_app + self.config.run_background_tasks and self.config.cleanup_extremities_with_dummy_events ): self.clock.looping_call( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index a0b3bdb5e0c3..563584a4bbc2 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -92,7 +92,7 @@ def __init__(self, hs: "HomeServer"): self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max - if hs.config.retention_enabled: + if hs.config.run_background_tasks and hs.config.retention_enabled: # Run the purge jobs described in the configuration file. for job in hs.config.retention_purge_jobs: logger.info("Setting up purge job with config: %s", job) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 5453e6dfc87a..55aa1e017ddc 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -371,11 +371,10 @@ class MasterProfileHandler(BaseProfileHandler): def __init__(self, hs): super().__init__(hs) - assert hs.config.worker_app is None - - self.clock.looping_call( - self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS - ) + if hs.config.run_background_tasks: + self.clock.looping_call( + self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS + ) def _start_update_remote_profile_cache(self): return run_as_background_process( diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 249ffe2a55c8..dc62b21c06f9 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -49,7 +49,7 @@ def __init__(self, hs): # Guard to ensure we only process deltas one at a time self._is_processing = False - if hs.config.stats_enabled: + if self.stats_enabled and hs.config.run_background_tasks: self.notifier.add_replication_callback(self.notify_new_event) # We kick this off so that we don't have to wait for a change before diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index f211ddbaf88e..5f7da9d4f1d7 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -40,7 +40,10 @@ def _censor_redactions(): "_censor_redactions", self._censor_redactions ) - if self.hs.config.redaction_retention_period is not None: + if ( + self.hs.config.run_background_tasks + and self.hs.config.redaction_retention_period is not None + ): hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) async def _censor_redactions(self): diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 239c7a949cba..1ae25dff0b1e 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -372,7 +372,7 @@ def __init__(self, database: DatabasePool, db_conn, hs): "before", "shutdown", self._update_client_ips_batch ) - if self.user_ips_max_age: + if hs.config.run_background_tasks and self.user_ips_max_age: self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) async def insert_client_ip( diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index c04374e43d11..41f5026acb91 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -834,7 +834,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): name="device_id_exists", keylen=2, max_entries=10000 ) - self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) + if hs.config.run_background_tasks: + self._clock.looping_call( + self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + ) async def store_device( self, user_id: str, device_id: str, initial_device_display_name: str diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 6d3689c09e59..b5f6e8ac6042 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -606,9 +606,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth ) - hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 - ) + if hs.config.run_background_tasks: + hs.get_clock().looping_call( + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + ) def _delete_old_forward_extrem_cache(self): def _delete_old_forward_extrem_cache_txn(txn): diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 62f1738732f3..e9515fa24573 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -679,9 +679,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): ) self._doing_notif_rotation = False - self._rotate_notif_loop = self._clock.looping_call( - self._start_rotate_notifs, 30 * 60 * 1000 - ) + if hs.config.run_background_tasks: + self._rotate_notif_loop = self._clock.looping_call( + self._start_rotate_notifs, 30 * 60 * 1000 + ) async def get_push_actions_for_user( self, user_id, before=None, limit=50, only_highlight=False @@ -741,7 +742,7 @@ def _remove_old_push_actions_before_txn( users can still get a list of recent highlights. Args: - txn: The transcation + txn: The transaction room_id: Room ID to delete from user_id: user ID to delete for stream_ordering: The lowest stream ordering which will diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 686052bd83c0..f90dded8c44a 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -52,7 +52,8 @@ def read_forward_extremities(): "read_forward_extremities", self._read_forward_extremities ) - hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + if hs.config.run_background_tasks: + hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) async def _read_forward_extremities(self): def fetch(txn): diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 675e81fe3436..ca8599d7de59 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -914,7 +914,8 @@ def start_cull(): self.cull_expired_threepid_validation_tokens, ) - hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) + if hs.config.run_background_tasks: + hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) async def add_access_token_to_user( self, diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 4fa8767b012e..6716905c249b 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -69,7 +69,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._check_safe_current_state_events_membership_updated_txn(txn) txn.close() - if self.hs.config.metrics_flags.known_servers: + if ( + self.hs.config.run_background_tasks + and self.hs.config.metrics_flags.known_servers + ): self._known_servers_count = 1 self.hs.get_clock().looping_call( run_as_background_process, diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 97aed1500e3e..31044e891d05 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -50,7 +50,8 @@ class TransactionStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) - self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) + if hs.config.run_background_tasks: + self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) self._destination_retry_cache = ExpiringCache( cache_name="get_destination_retry_timings", From 094b11edc72ae956db183203f3f4a4d1d8c6e0b5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 22 Sep 2020 13:27:38 -0400 Subject: [PATCH 02/19] Only run function on background worker. --- synapse/app/homeserver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 9ef438bec529..5c7498ad1af7 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -657,10 +657,10 @@ def start_generate_monthly_active_users(): "generate_monthly_active_users", generate_monthly_active_users ) - start_generate_monthly_active_users() if hs.config.run_background_tasks and ( hs.config.limit_usage_by_mau or hs.config.mau_stats_only ): + start_generate_monthly_active_users() clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) # End of monthly active user settings From fd8aad312aa093f83afe6dd6d19612b091172b04 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 22 Sep 2020 14:08:36 -0400 Subject: [PATCH 03/19] Accept a worker name instead of using a flag. --- docs/sample_config.yaml | 6 ++++++ synapse/config/workers.py | 16 +++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index fb04ff283dee..c95e49c3fe08 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -2458,6 +2458,12 @@ opentracing: # events: worker1 # typing: worker1 +# The worker that is used to run background tasks (e.g. cleaning up expired +# data). This should be one of the workers from `instance_map`. If not +# provided this defaults to the main process. +# +#run_background_tasks: worker1 + # Configuration for Redis when using workers. This *must* be enabled when # using workers (unless using old style direct TCP configuration). diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 3df07c83bd9d..1aff1f7aa93d 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -140,7 +140,15 @@ def read_config(self, config, **kwargs): # # Effort is not made to ensure only a single instance of these tasks is # running. - self.run_background_tasks = config.get("run_background_tasks", True) + instance = config.get("run_background_tasks_on") or "master" + if instance != "master" and instance not in self.instance_map: + raise ConfigError( + "Instance %r is configured to run background tasks but does not appear in `instance_map` config." + % (instance,) + ) + self.run_background_tasks = ( + self.worker_name is None and instance == "master" + ) or self.worker_name == instance def generate_config_section(self, config_dir_path, server_name, **kwargs): return """\ @@ -177,6 +185,12 @@ def generate_config_section(self, config_dir_path, server_name, **kwargs): #stream_writers: # events: worker1 # typing: worker1 + + # The worker that is used to run background tasks (e.g. cleaning up expired + # data). This should be one of the workers from `instance_map`. If not + # provided this defaults to the main process. + # + #run_background_tasks: worker1 """ def read_arguments(self, args): From 67c6fa408a7669f56219a915a79bbfb28f55a2fb Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 22 Sep 2020 14:50:42 -0400 Subject: [PATCH 04/19] Remove check for being the background_worker app. --- synapse/app/generic_worker.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 9ac567607ff2..14e06b41a691 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -962,22 +962,6 @@ def start(config_options): # For other worker types we force this to off. config.worker.send_federation = False - if config.worker_app == "synapse.app.background_worker": - if config.worker.run_background_tasks: - sys.stderr.write( - "\nThe run_background_tasks must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``run_background_tasks: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the background tasks to start since they will be disabled in the main config - config.worker.run_background_tasks = True - else: - # For other worker types we force this to off. - config.worker.run_background_tasks = False - synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts hs = GenericWorkerServer( From 08a7d5d69ba5c5cfe96c15f5556c23f7039ffa04 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Sep 2020 07:26:20 -0400 Subject: [PATCH 05/19] Do not allow a non-existent worker app. --- synapse/app/generic_worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 14e06b41a691..c38413c8937b 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -885,7 +885,6 @@ def start(config_options): # For backwards compatibility let any of the old app names. assert config.worker_app in ( "synapse.app.appservice", - "synapse.app.background_worker", "synapse.app.client_reader", "synapse.app.event_creator", "synapse.app.federation_reader", From 0d06a87c9834dd0e3a08a1f1cb0291d02fba7dbf Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 24 Sep 2020 14:25:32 -0400 Subject: [PATCH 06/19] Also run the user directory updating in the backgronud. --- synapse/handlers/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 79393c8829fc..f803f7e702d5 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -54,7 +54,7 @@ def __init__(self, hs): # Guard to ensure we only process deltas one at a time self._is_processing = False - if self.update_user_directory: + if hs.config.run_background_tasks and self.update_user_directory: self.notifier.add_replication_callback(self.notify_new_event) # We kick this off so that we don't have to wait for a change before From 2e98b7817684e44b5b073cb4b0e06fd4f45c42fb Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 24 Sep 2020 14:39:33 -0400 Subject: [PATCH 07/19] Ensure the proper handlers are loaded during start-up. --- synapse/app/homeserver.py | 2 -- synapse/server.py | 21 +++++++++++++++++---- tests/utils.py | 2 +- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 5c7498ad1af7..e615a1a077f8 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -389,8 +389,6 @@ def setup(config_options): except UpgradeDatabaseException as e: quit_with_error("Failed to upgrade database: %s" % (e,)) - hs.setup_master() - async def do_acme() -> bool: """ Reprovision an ACME certificate, if it's required. diff --git a/synapse/server.py b/synapse/server.py index 5e3752c3334f..3843a72e87eb 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -185,7 +185,17 @@ class HomeServer(metaclass=abc.ABCMeta): we are listening on to provide HTTP services. """ - REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"] + REQUIRED_ON_BACKGROUND_TASK_STARTUP = [ + "account_validity", + "auth", + "deactivate_account", + "device", + "message", + "pagination", + "profile", + "stats", + "user_directory", + ] # This is overridden in derived application classes # (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be @@ -251,14 +261,17 @@ def setup(self) -> None: self.datastores = Databases(self.DATASTORE_CLASS, self) logger.info("Finished setting up.") - def setup_master(self) -> None: + if self.config.run_background_tasks: + self.setup_background_tasks() + + def setup_background_tasks(self) -> None: """ Some handlers have side effects on instantiation (like registering background updates). This function causes them to be fetched, and therefore instantiated, to run those side effects. """ - for i in self.REQUIRED_ON_MASTER_STARTUP: - getattr(self, "get_" + i)() + for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP: + getattr(self, "get_" + i + "_handler")() def get_reactor(self) -> twisted.internet.base.ReactorBase: """ diff --git a/tests/utils.py b/tests/utils.py index 4673872f8890..7a927c7f7421 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -276,7 +276,7 @@ def setup_test_homeserver( hs.setup() if homeserverToUse.__name__ == "TestHomeServer": - hs.setup_master() + hs.setup_background_tasks() if isinstance(db_engine, PostgresEngine): database = hs.get_datastores().databases[0] From b77b89b044df817bed15fb14e0d534c113a3e03f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 25 Sep 2020 07:28:41 -0400 Subject: [PATCH 08/19] Backout some of the changes to simplify the PR. --- synapse/handlers/account_validity.py | 1 - synapse/handlers/deactivate_account.py | 2 +- synapse/handlers/device.py | 13 ++++++------- synapse/handlers/message.py | 2 +- synapse/handlers/pagination.py | 2 +- synapse/handlers/profile.py | 9 +++++---- synapse/server.py | 6 ------ synapse/storage/databases/main/censor_events.py | 5 +---- synapse/storage/databases/main/client_ips.py | 2 +- synapse/storage/databases/main/devices.py | 5 +---- synapse/storage/databases/main/event_federation.py | 7 +++---- .../storage/databases/main/event_push_actions.py | 9 ++++----- synapse/storage/databases/main/metrics.py | 3 +-- synapse/storage/databases/main/registration.py | 3 +-- synapse/storage/databases/main/roommember.py | 5 +---- synapse/storage/databases/main/transactions.py | 3 +-- 16 files changed, 28 insertions(+), 49 deletions(-) diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index abab471f5d5d..4caf6d591af5 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -42,7 +42,6 @@ def __init__(self, hs): if ( self._account_validity.enabled and self._account_validity.renew_by_email_enabled - and hs.config.run_background_tasks ): # Don't do email-specific configuration if renewal by email is disabled. self._template_html = self.config.account_validity_template_html diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 8c29c4e97298..0635ad570866 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -42,7 +42,7 @@ def __init__(self, hs): # Start the user parter loop so it can resume parting users from rooms where # it left off (if it has work left to do). - if hs.config.run_background_tasks: + if hs.config.worker_app is None: hs.get_reactor().callWhenRunning(self._start_user_parting) self._account_validity_enabled = hs.config.account_validity.enabled diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 19fd6bb0c799..55a978743988 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -522,13 +522,12 @@ def __init__(self, hs, device_handler): # Attempt to resync out of sync device lists every 30s. self._resync_retry_in_progress = False - if hs.config.run_background_tasks: - self.clock.looping_call( - run_as_background_process, - 30 * 1000, - func=self._maybe_retry_device_resync, - desc="_maybe_retry_device_resync", - ) + self.clock.looping_call( + run_as_background_process, + 30 * 1000, + func=self._maybe_retry_device_resync, + desc="_maybe_retry_device_resync", + ) @trace async def incoming_device_list_update(self, origin, edu_content): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index bac0a056fd51..a8fe5cf4e2eb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -413,7 +413,7 @@ def __init__(self, hs: "HomeServer"): self._consent_uri_builder = ConsentURIBuilder(self.config) if ( - self.config.run_background_tasks + not self.config.worker_app and self.config.cleanup_extremities_with_dummy_events ): self.clock.looping_call( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 563584a4bbc2..a0b3bdb5e0c3 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -92,7 +92,7 @@ def __init__(self, hs: "HomeServer"): self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max - if hs.config.run_background_tasks and hs.config.retention_enabled: + if hs.config.retention_enabled: # Run the purge jobs described in the configuration file. for job in hs.config.retention_purge_jobs: logger.info("Setting up purge job with config: %s", job) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 55aa1e017ddc..5453e6dfc87a 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -371,10 +371,11 @@ class MasterProfileHandler(BaseProfileHandler): def __init__(self, hs): super().__init__(hs) - if hs.config.run_background_tasks: - self.clock.looping_call( - self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS - ) + assert hs.config.worker_app is None + + self.clock.looping_call( + self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS + ) def _start_update_remote_profile_cache(self): return run_as_background_process( diff --git a/synapse/server.py b/synapse/server.py index 3843a72e87eb..bd5a0fb68d0e 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -186,13 +186,7 @@ class HomeServer(metaclass=abc.ABCMeta): """ REQUIRED_ON_BACKGROUND_TASK_STARTUP = [ - "account_validity", "auth", - "deactivate_account", - "device", - "message", - "pagination", - "profile", "stats", "user_directory", ] diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 5f7da9d4f1d7..f211ddbaf88e 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -40,10 +40,7 @@ def _censor_redactions(): "_censor_redactions", self._censor_redactions ) - if ( - self.hs.config.run_background_tasks - and self.hs.config.redaction_retention_period is not None - ): + if self.hs.config.redaction_retention_period is not None: hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) async def _censor_redactions(self): diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 1ae25dff0b1e..239c7a949cba 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -372,7 +372,7 @@ def __init__(self, database: DatabasePool, db_conn, hs): "before", "shutdown", self._update_client_ips_batch ) - if hs.config.run_background_tasks and self.user_ips_max_age: + if self.user_ips_max_age: self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) async def insert_client_ip( diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 41f5026acb91..c04374e43d11 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -834,10 +834,7 @@ def __init__(self, database: DatabasePool, db_conn, hs): name="device_id_exists", keylen=2, max_entries=10000 ) - if hs.config.run_background_tasks: - self._clock.looping_call( - self._prune_old_outbound_device_pokes, 60 * 60 * 1000 - ) + self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) async def store_device( self, user_id: str, device_id: str, initial_device_display_name: str diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index b5f6e8ac6042..6d3689c09e59 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -606,10 +606,9 @@ def __init__(self, database: DatabasePool, db_conn, hs): self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth ) - if hs.config.run_background_tasks: - hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 - ) + hs.get_clock().looping_call( + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + ) def _delete_old_forward_extrem_cache(self): def _delete_old_forward_extrem_cache_txn(txn): diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index e9515fa24573..62f1738732f3 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -679,10 +679,9 @@ def __init__(self, database: DatabasePool, db_conn, hs): ) self._doing_notif_rotation = False - if hs.config.run_background_tasks: - self._rotate_notif_loop = self._clock.looping_call( - self._start_rotate_notifs, 30 * 60 * 1000 - ) + self._rotate_notif_loop = self._clock.looping_call( + self._start_rotate_notifs, 30 * 60 * 1000 + ) async def get_push_actions_for_user( self, user_id, before=None, limit=50, only_highlight=False @@ -742,7 +741,7 @@ def _remove_old_push_actions_before_txn( users can still get a list of recent highlights. Args: - txn: The transaction + txn: The transcation room_id: Room ID to delete from user_id: user ID to delete for stream_ordering: The lowest stream ordering which will diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index f90dded8c44a..686052bd83c0 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -52,8 +52,7 @@ def read_forward_extremities(): "read_forward_extremities", self._read_forward_extremities ) - if hs.config.run_background_tasks: - hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) async def _read_forward_extremities(self): def fetch(txn): diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index ca8599d7de59..675e81fe3436 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -914,8 +914,7 @@ def start_cull(): self.cull_expired_threepid_validation_tokens, ) - if hs.config.run_background_tasks: - hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) + hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) async def add_access_token_to_user( self, diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 6716905c249b..4fa8767b012e 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -69,10 +69,7 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._check_safe_current_state_events_membership_updated_txn(txn) txn.close() - if ( - self.hs.config.run_background_tasks - and self.hs.config.metrics_flags.known_servers - ): + if self.hs.config.metrics_flags.known_servers: self._known_servers_count = 1 self.hs.get_clock().looping_call( run_as_background_process, diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 31044e891d05..97aed1500e3e 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -50,8 +50,7 @@ class TransactionStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) - if hs.config.run_background_tasks: - self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) + self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) self._destination_retry_cache = ExpiringCache( cache_name="get_destination_retry_timings", From f26b92eec7b8124ec2b293c16bb2daa62ce32042 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 25 Sep 2020 07:30:58 -0400 Subject: [PATCH 09/19] Ensure that the background tasks have access to the proper stores. --- synapse/app/generic_worker.py | 2 ++ synapse/storage/databases/main/ui_auth.py | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index c38413c8937b..094f07274cdf 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -133,6 +133,7 @@ ) from synapse.storage.databases.main.presence import UserPresenceState from synapse.storage.databases.main.search import SearchWorkerStore +from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore from synapse.types import ReadReceipt @@ -454,6 +455,7 @@ class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. UserDirectoryStore, + StatsStore, UIAuthWorkerStore, SlavedDeviceInboxStore, SlavedDeviceStore, diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py index 3b9211a6d235..79b7ece3302a 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py @@ -288,8 +288,6 @@ async def get_user_agents_ips_to_ui_auth_session( ) return [(row["user_agent"], row["ip"]) for row in rows] - -class UIAuthStore(UIAuthWorkerStore): async def delete_old_ui_auth_sessions(self, expiration_time: int) -> None: """ Remove sessions which were last used earlier than the expiration time. @@ -339,3 +337,7 @@ def _delete_old_ui_auth_sessions_txn( iterable=session_ids, keyvalues={}, ) + + +class UIAuthStore(UIAuthWorkerStore): + pass From 53a44024ac643ba94d777325e46c994a49dc355a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 25 Sep 2020 08:20:36 -0400 Subject: [PATCH 10/19] Add some documentation. --- docs/workers.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/workers.md b/docs/workers.md index df0ac84d9466..e168a4274fdc 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -303,6 +303,24 @@ stream_writers: events: event_persister1 ``` +#### Background tasks + +There is also *experimental* support for moving background tasks to a separate +worker. These are either run periodically or started via replication and which +tasks are run depend on your Synapse configuration (e.g. if stats is enabled). + +To enable this, the worker must have a `worker_name` and be listed in the +`instance_map` config. For example, to move background tasks to a dedicated +worker, the shared configuration would include: + +```yaml +instance_map: + background_worker: + host: localhost + port: 8035 + +run_background_tasks: background_worker +``` ### `synapse.app.pusher` From 82d167b68e276425563a09a3bb5ad643d1619395 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 25 Sep 2020 08:29:38 -0400 Subject: [PATCH 11/19] Do not require a replication endpoint for the instance map. --- docs/workers.md | 4 +--- synapse/config/workers.py | 6 ++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/workers.md b/docs/workers.md index e168a4274fdc..42b5f4b6c2e6 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -315,9 +315,7 @@ worker, the shared configuration would include: ```yaml instance_map: - background_worker: - host: localhost - port: 8035 + background_worker: null run_background_tasks: background_worker ``` diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 1aff1f7aa93d..6c566ad4294d 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -109,10 +109,12 @@ def read_config(self, config, **kwargs): federation_sender_instances ) - # A map from instance name to host/port of their HTTP replication endpoint. + # A map from instance name to host/port of their HTTP replication endpoint + # (or None if there's no HTTP replication endpoint). instance_map = config.get("instance_map") or {} self.instance_map = { - name: InstanceLocationConfig(**c) for name, c in instance_map.items() + name: InstanceLocationConfig(**c) if c else None + for name, c in instance_map.items() } # Map from type of streams to source, c.f. WriterLocations. From 0c9e9703b1ee0cf591a0b82ed390f1decfc0b95b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 25 Sep 2020 08:45:16 -0400 Subject: [PATCH 12/19] Clarify confusing wording. --- docs/workers.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/workers.md b/docs/workers.md index 42b5f4b6c2e6..3bb679fa37aa 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -306,8 +306,9 @@ stream_writers: #### Background tasks There is also *experimental* support for moving background tasks to a separate -worker. These are either run periodically or started via replication and which -tasks are run depend on your Synapse configuration (e.g. if stats is enabled). +worker. Background tasks are run periodically or started via replication. Exactly +which tasks are configured to run depends on your Synapse configuration (e.g. if +stats is enabled). To enable this, the worker must have a `worker_name` and be listed in the `instance_map` config. For example, to move background tasks to a dedicated From d40aff7343fdd1da90335baa421ca330aad70f3e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 29 Sep 2020 07:47:28 -0400 Subject: [PATCH 13/19] Do not require the background worker instance to be in the instance map. --- docs/workers.md | 11 ++++------- synapse/config/workers.py | 13 +++---------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/docs/workers.md b/docs/workers.md index 3bb679fa37aa..87c10f7fd3eb 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -310,15 +310,12 @@ worker. Background tasks are run periodically or started via replication. Exactl which tasks are configured to run depends on your Synapse configuration (e.g. if stats is enabled). -To enable this, the worker must have a `worker_name` and be listed in the -`instance_map` config. For example, to move background tasks to a dedicated -worker, the shared configuration would include: +To enable this, the worker must have a `worker_name` and can be configured to run +background tasks. For example, to move background tasks to a dedicated worker, +the shared configuration would include: ```yaml -instance_map: - background_worker: null - -run_background_tasks: background_worker +run_background_tasks_on: background_worker ``` ### `synapse.app.pusher` diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 6c566ad4294d..3034eef456af 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -109,12 +109,10 @@ def read_config(self, config, **kwargs): federation_sender_instances ) - # A map from instance name to host/port of their HTTP replication endpoint - # (or None if there's no HTTP replication endpoint). + # A map from instance name to host/port of their HTTP replication endpoint. instance_map = config.get("instance_map") or {} self.instance_map = { - name: InstanceLocationConfig(**c) if c else None - for name, c in instance_map.items() + name: InstanceLocationConfig(**c) for name, c in instance_map.items() } # Map from type of streams to source, c.f. WriterLocations. @@ -140,14 +138,9 @@ def read_config(self, config, **kwargs): # be able to run on only a single instance (meaning that they don't # depend on any in-memory state of a particular worker). # - # Effort is not made to ensure only a single instance of these tasks is + # No effort is made to ensure only a single instance of these tasks is # running. instance = config.get("run_background_tasks_on") or "master" - if instance != "master" and instance not in self.instance_map: - raise ConfigError( - "Instance %r is configured to run background tasks but does not appear in `instance_map` config." - % (instance,) - ) self.run_background_tasks = ( self.worker_name is None and instance == "master" ) or self.worker_name == instance From c015379de04c924561c93aaade8e01c138b2799d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 29 Sep 2020 07:49:00 -0400 Subject: [PATCH 14/19] Update the sample config. --- docs/sample_config.yaml | 5 ++--- synapse/config/workers.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index c95e49c3fe08..68f5dd8f3024 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -2459,10 +2459,9 @@ opentracing: # typing: worker1 # The worker that is used to run background tasks (e.g. cleaning up expired -# data). This should be one of the workers from `instance_map`. If not -# provided this defaults to the main process. +# data). If not provided this defaults to the main process. # -#run_background_tasks: worker1 +#run_background_tasks_on: worker1 # Configuration for Redis when using workers. This *must* be enabled when diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 3034eef456af..5014615a08cf 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -182,10 +182,9 @@ def generate_config_section(self, config_dir_path, server_name, **kwargs): # typing: worker1 # The worker that is used to run background tasks (e.g. cleaning up expired - # data). This should be one of the workers from `instance_map`. If not - # provided this defaults to the main process. + # data). If not provided this defaults to the main process. # - #run_background_tasks: worker1 + #run_background_tasks_on: worker1 """ def read_arguments(self, args): From cfe28f2729b69c1948aea127dfdd717dc475e591 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 29 Sep 2020 09:08:20 -0400 Subject: [PATCH 15/19] The user directory background tasks are controlled by a separate flag. --- docs/workers.md | 3 +++ synapse/handlers/user_directory.py | 2 +- synapse/server.py | 4 +++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/workers.md b/docs/workers.md index 87c10f7fd3eb..3b4c16ecd026 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -318,6 +318,9 @@ the shared configuration would include: run_background_tasks_on: background_worker ``` +You might also wish to investigate the `update_user_directory` and +`media_instance_running_background_jobs` settings. + ### `synapse.app.pusher` Handles sending push notifications to sygnal and email. Doesn't handle any diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index f803f7e702d5..79393c8829fc 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -54,7 +54,7 @@ def __init__(self, hs): # Guard to ensure we only process deltas one at a time self._is_processing = False - if hs.config.run_background_tasks and self.update_user_directory: + if self.update_user_directory: self.notifier.add_replication_callback(self.notify_new_event) # We kick this off so that we don't have to wait for a change before diff --git a/synapse/server.py b/synapse/server.py index bd5a0fb68d0e..aa2273955cd4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -188,7 +188,6 @@ class HomeServer(metaclass=abc.ABCMeta): REQUIRED_ON_BACKGROUND_TASK_STARTUP = [ "auth", "stats", - "user_directory", ] # This is overridden in derived application classes @@ -255,6 +254,9 @@ def setup(self) -> None: self.datastores = Databases(self.DATASTORE_CLASS, self) logger.info("Finished setting up.") + # Register background tasks required by this server. This must be done + # somewhat manually due to the background tasks not being registered + # unless handlers are instantiated. if self.config.run_background_tasks: self.setup_background_tasks() From f0c83d909fae340f38ae73df7e4c0b83482c8869 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 30 Sep 2020 08:04:13 -0400 Subject: [PATCH 16/19] Rename instance variable. --- synapse/config/workers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 5014615a08cf..57ab097eba3e 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -140,10 +140,10 @@ def read_config(self, config, **kwargs): # # No effort is made to ensure only a single instance of these tasks is # running. - instance = config.get("run_background_tasks_on") or "master" + background_tasks_instance = config.get("run_background_tasks_on") or "master" self.run_background_tasks = ( - self.worker_name is None and instance == "master" - ) or self.worker_name == instance + self.worker_name is None and background_tasks_instance == "master" + ) or self.worker_name == background_tasks_instance def generate_config_section(self, config_dir_path, server_name, **kwargs): return """\ From b226c49dc5e6722724e85438fe6a07e12ca28d41 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 30 Sep 2020 10:27:03 -0400 Subject: [PATCH 17/19] Allow the phone home stats to be able to run on any worker. --- synapse/app/generic_worker.py | 6 + synapse/app/homeserver.py | 186 +---------------------------- synapse/app/phone_stats_home.py | 202 ++++++++++++++++++++++++++++++++ tests/test_phone_home.py | 2 +- 4 files changed, 213 insertions(+), 183 deletions(-) create mode 100644 synapse/app/phone_stats_home.py diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 094f07274cdf..e81bf8af4961 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -34,6 +34,7 @@ SERVER_KEY_V2_PREFIX, ) from synapse.app import _base +from synapse.app.phone_stats_home import start_phone_stats_home from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging @@ -975,6 +976,11 @@ def start(config_options): hs.setup() + # If background tasks are running on this worker, start collecting the phone + # home stats. + if hs.config.run_background_tasks: + start_phone_stats_home(hs) + # Ensure the replication streamer is always started in case we write to any # streams. Will no-op if no streams can be written to by this worker. hs.get_replication_streamer() diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e615a1a077f8..6b57d3024db3 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -17,14 +17,10 @@ import gc import logging -import math import os -import resource import sys from typing import Iterable -from prometheus_client import Gauge - from twisted.application import service from twisted.internet import defer, reactor from twisted.python.failure import Failure @@ -45,6 +41,7 @@ ) from synapse.app import _base from synapse.app._base import listen_ssl, listen_tcp, quit_with_error +from synapse.app.phone_stats_home import start_phone_stats_home from synapse.config._base import ConfigError from synapse.config.emailconfig import ThreepidBehaviour from synapse.config.homeserver import HomeServerConfig @@ -60,7 +57,6 @@ from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import ModuleApi from synapse.python_dependencies import check_requirements from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource @@ -334,20 +330,6 @@ def start_listening(self, listeners: Iterable[ListenerConfig]): logger.warning("Unrecognized listener type: %s", listener.type) -# Gauges to expose monthly active user control metrics -current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") -current_mau_by_service_gauge = Gauge( - "synapse_admin_mau_current_mau_by_service", - "Current MAU by service", - ["app_service"], -) -max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") -registered_reserved_users_mau_gauge = Gauge( - "synapse_admin_mau:registered_reserved_users", - "Registered users with reserved threepids", -) - - def setup(config_options): """ Args: @@ -484,92 +466,6 @@ def stopService(self): return self._port.stopListening() -# Contains the list of processes we will be monitoring -# currently either 0 or 1 -_stats_process = [] - - -async def phone_stats_home(hs, stats, stats_process=_stats_process): - logger.info("Gathering stats for reporting") - now = int(hs.get_clock().time()) - uptime = int(now - hs.start_time) - if uptime < 0: - uptime = 0 - - # - # Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test. - # - old = stats_process[0] - new = (now, resource.getrusage(resource.RUSAGE_SELF)) - stats_process[0] = new - - # Get RSS in bytes - stats["memory_rss"] = new[1].ru_maxrss - - # Get CPU time in % of a single core, not % of all cores - used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - ( - old[1].ru_utime + old[1].ru_stime - ) - if used_cpu_time == 0 or new[0] == old[0]: - stats["cpu_average"] = 0 - else: - stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100) - - # - # General statistics - # - - stats["homeserver"] = hs.config.server_name - stats["server_context"] = hs.config.server_context - stats["timestamp"] = now - stats["uptime_seconds"] = uptime - version = sys.version_info - stats["python_version"] = "{}.{}.{}".format( - version.major, version.minor, version.micro - ) - stats["total_users"] = await hs.get_datastore().count_all_users() - - total_nonbridged_users = await hs.get_datastore().count_nonbridged_users() - stats["total_nonbridged_users"] = total_nonbridged_users - - daily_user_type_results = await hs.get_datastore().count_daily_user_type() - for name, count in daily_user_type_results.items(): - stats["daily_user_type_" + name] = count - - room_count = await hs.get_datastore().get_room_count() - stats["total_room_count"] = room_count - - stats["daily_active_users"] = await hs.get_datastore().count_daily_users() - stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users() - stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms() - stats["daily_messages"] = await hs.get_datastore().count_daily_messages() - - r30_results = await hs.get_datastore().count_r30_users() - for name, count in r30_results.items(): - stats["r30_users_" + name] = count - - daily_sent_messages = await hs.get_datastore().count_daily_sent_messages() - stats["daily_sent_messages"] = daily_sent_messages - stats["cache_factor"] = hs.config.caches.global_factor - stats["event_cache_size"] = hs.config.caches.event_cache_size - - # - # Database version - # - - # This only reports info about the *main* database. - stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__ - stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version - - logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats)) - try: - await hs.get_proxied_http_client().put_json( - hs.config.report_stats_endpoint, stats - ) - except Exception as e: - logger.warning("Error reporting stats: %s", e) - - def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: @@ -595,84 +491,10 @@ def profiled(*args, **kargs): ThreadPool._worker = profile(ThreadPool._worker) reactor.run = profile(reactor.run) - clock = hs.get_clock() - - stats = {} - - def performance_stats_init(): - _stats_process.clear() - _stats_process.append( - (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF)) - ) - - def start_phone_stats_home(): - return run_as_background_process( - "phone_stats_home", phone_stats_home, hs, stats - ) - - def generate_user_daily_visit_stats(): - return run_as_background_process( - "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits - ) - - # Rather than update on per session basis, batch up the requests. - # If you increase the loop period, the accuracy of user_daily_visits - # table will decrease + # If background tasks are running on the main process, start collecting the + # phone home stats. if hs.config.run_background_tasks: - clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) - - # monthly active user limiting functionality - def reap_monthly_active_users(): - return run_as_background_process( - "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users - ) - - if hs.config.run_background_tasks: - clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) - reap_monthly_active_users() - - async def generate_monthly_active_users(): - current_mau_count = 0 - current_mau_count_by_service = {} - reserved_users = () - store = hs.get_datastore() - if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: - current_mau_count = await store.get_monthly_active_count() - current_mau_count_by_service = ( - await store.get_monthly_active_count_by_service() - ) - reserved_users = await store.get_registered_reserved_users() - current_mau_gauge.set(float(current_mau_count)) - - for app_service, count in current_mau_count_by_service.items(): - current_mau_by_service_gauge.labels(app_service).set(float(count)) - - registered_reserved_users_mau_gauge.set(float(len(reserved_users))) - max_mau_gauge.set(float(hs.config.max_mau_value)) - - def start_generate_monthly_active_users(): - return run_as_background_process( - "generate_monthly_active_users", generate_monthly_active_users - ) - - if hs.config.run_background_tasks and ( - hs.config.limit_usage_by_mau or hs.config.mau_stats_only - ): - start_generate_monthly_active_users() - clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) - # End of monthly active user settings - - if hs.config.run_background_tasks and hs.config.report_stats: - logger.info("Scheduling stats reporting for 3 hour intervals") - clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) - - # We need to defer this init for the cases that we daemonize - # otherwise the process ID we get is that of the non-daemon process - clock.call_later(0, performance_stats_init) - - # We wait 5 minutes to send the first set of stats as the server can - # be quite busy the first few minutes - clock.call_later(5 * 60, start_phone_stats_home) + start_phone_stats_home(hs) _base.start_reactor( "synapse-homeserver", diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py new file mode 100644 index 000000000000..2c8e14a8c0ca --- /dev/null +++ b/synapse/app/phone_stats_home.py @@ -0,0 +1,202 @@ +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import math +import resource +import sys + +from prometheus_client import Gauge + +from synapse.metrics.background_process_metrics import run_as_background_process + +logger = logging.getLogger("synapse.app.homeserver") + +# Contains the list of processes we will be monitoring +# currently either 0 or 1 +_stats_process = [] + +# Gauges to expose monthly active user control metrics +current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") +current_mau_by_service_gauge = Gauge( + "synapse_admin_mau_current_mau_by_service", + "Current MAU by service", + ["app_service"], +) +max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") +registered_reserved_users_mau_gauge = Gauge( + "synapse_admin_mau:registered_reserved_users", + "Registered users with reserved threepids", +) + + +async def phone_stats_home(hs, stats, stats_process=_stats_process): + logger.info("Gathering stats for reporting") + now = int(hs.get_clock().time()) + uptime = int(now - hs.start_time) + if uptime < 0: + uptime = 0 + + # + # Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test. + # + old = stats_process[0] + new = (now, resource.getrusage(resource.RUSAGE_SELF)) + stats_process[0] = new + + # Get RSS in bytes + stats["memory_rss"] = new[1].ru_maxrss + + # Get CPU time in % of a single core, not % of all cores + used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - ( + old[1].ru_utime + old[1].ru_stime + ) + if used_cpu_time == 0 or new[0] == old[0]: + stats["cpu_average"] = 0 + else: + stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100) + + # + # General statistics + # + + stats["homeserver"] = hs.config.server_name + stats["server_context"] = hs.config.server_context + stats["timestamp"] = now + stats["uptime_seconds"] = uptime + version = sys.version_info + stats["python_version"] = "{}.{}.{}".format( + version.major, version.minor, version.micro + ) + stats["total_users"] = await hs.get_datastore().count_all_users() + + total_nonbridged_users = await hs.get_datastore().count_nonbridged_users() + stats["total_nonbridged_users"] = total_nonbridged_users + + daily_user_type_results = await hs.get_datastore().count_daily_user_type() + for name, count in daily_user_type_results.items(): + stats["daily_user_type_" + name] = count + + room_count = await hs.get_datastore().get_room_count() + stats["total_room_count"] = room_count + + stats["daily_active_users"] = await hs.get_datastore().count_daily_users() + stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users() + stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms() + stats["daily_messages"] = await hs.get_datastore().count_daily_messages() + + r30_results = await hs.get_datastore().count_r30_users() + for name, count in r30_results.items(): + stats["r30_users_" + name] = count + + daily_sent_messages = await hs.get_datastore().count_daily_sent_messages() + stats["daily_sent_messages"] = daily_sent_messages + stats["cache_factor"] = hs.config.caches.global_factor + stats["event_cache_size"] = hs.config.caches.event_cache_size + + # + # Database version + # + + # This only reports info about the *main* database. + stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__ + stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version + + logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats)) + try: + await hs.get_proxied_http_client().put_json( + hs.config.report_stats_endpoint, stats + ) + except Exception as e: + logger.warning("Error reporting stats: %s", e) + + +def start_phone_stats_home(hs): + """ + Start the background tasks which report phone home stats. + """ + clock = hs.get_clock() + + stats = {} + + def performance_stats_init(): + _stats_process.clear() + _stats_process.append( + (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF)) + ) + + def start_phone_stats_home(): + return run_as_background_process( + "phone_stats_home", phone_stats_home, hs, stats + ) + + def generate_user_daily_visit_stats(): + return run_as_background_process( + "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits + ) + + # Rather than update on per session basis, batch up the requests. + # If you increase the loop period, the accuracy of user_daily_visits + # table will decrease + clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) + + # monthly active user limiting functionality + def reap_monthly_active_users(): + return run_as_background_process( + "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users + ) + + clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) + reap_monthly_active_users() + + async def generate_monthly_active_users(): + current_mau_count = 0 + current_mau_count_by_service = {} + reserved_users = () + store = hs.get_datastore() + if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: + current_mau_count = await store.get_monthly_active_count() + current_mau_count_by_service = ( + await store.get_monthly_active_count_by_service() + ) + reserved_users = await store.get_registered_reserved_users() + current_mau_gauge.set(float(current_mau_count)) + + for app_service, count in current_mau_count_by_service.items(): + current_mau_by_service_gauge.labels(app_service).set(float(count)) + + registered_reserved_users_mau_gauge.set(float(len(reserved_users))) + max_mau_gauge.set(float(hs.config.max_mau_value)) + + def start_generate_monthly_active_users(): + return run_as_background_process( + "generate_monthly_active_users", generate_monthly_active_users + ) + + if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: + start_generate_monthly_active_users() + clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) + # End of monthly active user settings + + if hs.config.report_stats: + logger.info("Scheduling stats reporting for 3 hour intervals") + clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) + + # We need to defer this init for the cases that we daemonize + # otherwise the process ID we get is that of the non-daemon process + clock.call_later(0, performance_stats_init) + + # We wait 5 minutes to send the first set of stats as the server can + # be quite busy the first few minutes + clock.call_later(5 * 60, start_phone_stats_home) diff --git a/tests/test_phone_home.py b/tests/test_phone_home.py index 7657bddea5be..e7aed092c275 100644 --- a/tests/test_phone_home.py +++ b/tests/test_phone_home.py @@ -17,7 +17,7 @@ import mock -from synapse.app.homeserver import phone_stats_home +from synapse.app.phone_stats_home import phone_stats_home from tests.unittest import HomeserverTestCase From cb740cf0ac27fd9611a2c6e7b258b01f3e6b4fa0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 30 Sep 2020 11:13:35 -0400 Subject: [PATCH 18/19] Move around some storage classes to make the proper metrics methods available to workers. --- synapse/app/generic_worker.py | 2 + synapse/storage/databases/main/__init__.py | 191 ----------------- synapse/storage/databases/main/metrics.py | 195 ++++++++++++++++++ .../databases/main/monthly_active_users.py | 109 +++++----- synapse/storage/databases/main/room.py | 24 +-- 5 files changed, 264 insertions(+), 257 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index e81bf8af4961..367ec2c57eb3 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -129,6 +129,7 @@ from synapse.server import HomeServer, cache_in_self from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.media_repository import MediaRepositoryStore +from synapse.storage.databases.main.metrics import ServerMetricsStore from synapse.storage.databases.main.monthly_active_users import ( MonthlyActiveUsersWorkerStore, ) @@ -479,6 +480,7 @@ class GenericWorkerSlavedStore( SlavedFilteringStore, MonthlyActiveUsersWorkerStore, MediaRepositoryStore, + ServerMetricsStore, SearchWorkerStore, BaseSlavedStore, ): diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index ccb3384db9d2..e1b128db70f8 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -15,9 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import calendar import logging -import time from typing import Any, Dict, List, Optional, Tuple from synapse.api.constants import PresenceState @@ -262,9 +260,6 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() - # Used in _generate_user_daily_visits to keep track of progress - self._last_user_visit_update = self._get_start_of_day() - def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() @@ -295,192 +290,6 @@ def _get_active_presence(self, db_conn): return [UserPresenceState(**row) for row in rows] - async def count_daily_users(self) -> int: - """ - Counts the number of users who used this homeserver in the last 24 hours. - """ - yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24) - return await self.db_pool.runInteraction( - "count_daily_users", self._count_users, yesterday - ) - - async def count_monthly_users(self) -> int: - """ - Counts the number of users who used this homeserver in the last 30 days. - Note this method is intended for phonehome metrics only and is different - from the mau figure in synapse.storage.monthly_active_users which, - amongst other things, includes a 3 day grace period before a user counts. - """ - thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) - return await self.db_pool.runInteraction( - "count_monthly_users", self._count_users, thirty_days_ago - ) - - def _count_users(self, txn, time_from): - """ - Returns number of users seen in the past time_from period - """ - sql = """ - SELECT COALESCE(count(*), 0) FROM ( - SELECT user_id FROM user_ips - WHERE last_seen > ? - GROUP BY user_id - ) u - """ - txn.execute(sql, (time_from,)) - (count,) = txn.fetchone() - return count - - async def count_r30_users(self) -> Dict[str, int]: - """ - Counts the number of 30 day retained users, defined as:- - * Users who have created their accounts more than 30 days ago - * Where last seen at most 30 days ago - * Where account creation and last_seen are > 30 days apart - - Returns: - A mapping of counts globally as well as broken out by platform. - """ - - def _count_r30_users(txn): - thirty_days_in_secs = 86400 * 30 - now = int(self._clock.time()) - thirty_days_ago_in_secs = now - thirty_days_in_secs - - sql = """ - SELECT platform, COALESCE(count(*), 0) FROM ( - SELECT - users.name, platform, users.creation_ts * 1000, - MAX(uip.last_seen) - FROM users - INNER JOIN ( - SELECT - user_id, - last_seen, - CASE - WHEN user_agent LIKE '%%Android%%' THEN 'android' - WHEN user_agent LIKE '%%iOS%%' THEN 'ios' - WHEN user_agent LIKE '%%Electron%%' THEN 'electron' - WHEN user_agent LIKE '%%Mozilla%%' THEN 'web' - WHEN user_agent LIKE '%%Gecko%%' THEN 'web' - ELSE 'unknown' - END - AS platform - FROM user_ips - ) uip - ON users.name = uip.user_id - AND users.appservice_id is NULL - AND users.creation_ts < ? - AND uip.last_seen/1000 > ? - AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 - GROUP BY users.name, platform, users.creation_ts - ) u GROUP BY platform - """ - - results = {} - txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) - - for row in txn: - if row[0] == "unknown": - pass - results[row[0]] = row[1] - - sql = """ - SELECT COALESCE(count(*), 0) FROM ( - SELECT users.name, users.creation_ts * 1000, - MAX(uip.last_seen) - FROM users - INNER JOIN ( - SELECT - user_id, - last_seen - FROM user_ips - ) uip - ON users.name = uip.user_id - AND appservice_id is NULL - AND users.creation_ts < ? - AND uip.last_seen/1000 > ? - AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 - GROUP BY users.name, users.creation_ts - ) u - """ - - txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) - - (count,) = txn.fetchone() - results["all"] = count - - return results - - return await self.db_pool.runInteraction("count_r30_users", _count_r30_users) - - def _get_start_of_day(self): - """ - Returns millisecond unixtime for start of UTC day. - """ - now = time.gmtime() - today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0)) - return today_start * 1000 - - async def generate_user_daily_visits(self) -> None: - """ - Generates daily visit data for use in cohort/ retention analysis - """ - - def _generate_user_daily_visits(txn): - logger.info("Calling _generate_user_daily_visits") - today_start = self._get_start_of_day() - a_day_in_milliseconds = 24 * 60 * 60 * 1000 - now = self.clock.time_msec() - - sql = """ - INSERT INTO user_daily_visits (user_id, device_id, timestamp) - SELECT u.user_id, u.device_id, ? - FROM user_ips AS u - LEFT JOIN ( - SELECT user_id, device_id, timestamp FROM user_daily_visits - WHERE timestamp = ? - ) udv - ON u.user_id = udv.user_id AND u.device_id=udv.device_id - INNER JOIN users ON users.name=u.user_id - WHERE last_seen > ? AND last_seen <= ? - AND udv.timestamp IS NULL AND users.is_guest=0 - AND users.appservice_id IS NULL - GROUP BY u.user_id, u.device_id - """ - - # This means that the day has rolled over but there could still - # be entries from the previous day. There is an edge case - # where if the user logs in at 23:59 and overwrites their - # last_seen at 00:01 then they will not be counted in the - # previous day's stats - it is important that the query is run - # often to minimise this case. - if today_start > self._last_user_visit_update: - yesterday_start = today_start - a_day_in_milliseconds - txn.execute( - sql, - ( - yesterday_start, - yesterday_start, - self._last_user_visit_update, - today_start, - ), - ) - self._last_user_visit_update = today_start - - txn.execute( - sql, (today_start, today_start, self._last_user_visit_update, now) - ) - # Update _last_user_visit_update to now. The reason to do this - # rather just clamping to the beginning of the day is to limit - # the size of the join - meaning that the query can be run more - # frequently - self._last_user_visit_update = now - - await self.db_pool.runInteraction( - "generate_user_daily_visits", _generate_user_daily_visits - ) - async def get_users(self) -> List[Dict[str, Any]]: """Function to retrieve a list of users in users table. diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 686052bd83c0..c35c65bec398 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -12,8 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import calendar +import logging +import time import typing from collections import Counter +from typing import Dict from synapse.metrics import BucketCollector from synapse.metrics.background_process_metrics import run_as_background_process @@ -23,6 +27,8 @@ EventPushActionsWorkerStore, ) +logger = logging.getLogger(__name__) + class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): """Functions to pull various metrics from the DB, for e.g. phone home @@ -54,6 +60,9 @@ def read_forward_extremities(): hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + # Used in _generate_user_daily_visits to keep track of progress + self._last_user_visit_update = self._get_start_of_day() + async def _read_forward_extremities(self): def fetch(txn): txn.execute( @@ -120,3 +129,189 @@ def _count(txn): return count return await self.db_pool.runInteraction("count_daily_active_rooms", _count) + + async def count_daily_users(self) -> int: + """ + Counts the number of users who used this homeserver in the last 24 hours. + """ + yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24) + return await self.db_pool.runInteraction( + "count_daily_users", self._count_users, yesterday + ) + + async def count_monthly_users(self) -> int: + """ + Counts the number of users who used this homeserver in the last 30 days. + Note this method is intended for phonehome metrics only and is different + from the mau figure in synapse.storage.monthly_active_users which, + amongst other things, includes a 3 day grace period before a user counts. + """ + thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) + return await self.db_pool.runInteraction( + "count_monthly_users", self._count_users, thirty_days_ago + ) + + def _count_users(self, txn, time_from): + """ + Returns number of users seen in the past time_from period + """ + sql = """ + SELECT COALESCE(count(*), 0) FROM ( + SELECT user_id FROM user_ips + WHERE last_seen > ? + GROUP BY user_id + ) u + """ + txn.execute(sql, (time_from,)) + (count,) = txn.fetchone() + return count + + async def count_r30_users(self) -> Dict[str, int]: + """ + Counts the number of 30 day retained users, defined as:- + * Users who have created their accounts more than 30 days ago + * Where last seen at most 30 days ago + * Where account creation and last_seen are > 30 days apart + + Returns: + A mapping of counts globally as well as broken out by platform. + """ + + def _count_r30_users(txn): + thirty_days_in_secs = 86400 * 30 + now = int(self._clock.time()) + thirty_days_ago_in_secs = now - thirty_days_in_secs + + sql = """ + SELECT platform, COALESCE(count(*), 0) FROM ( + SELECT + users.name, platform, users.creation_ts * 1000, + MAX(uip.last_seen) + FROM users + INNER JOIN ( + SELECT + user_id, + last_seen, + CASE + WHEN user_agent LIKE '%%Android%%' THEN 'android' + WHEN user_agent LIKE '%%iOS%%' THEN 'ios' + WHEN user_agent LIKE '%%Electron%%' THEN 'electron' + WHEN user_agent LIKE '%%Mozilla%%' THEN 'web' + WHEN user_agent LIKE '%%Gecko%%' THEN 'web' + ELSE 'unknown' + END + AS platform + FROM user_ips + ) uip + ON users.name = uip.user_id + AND users.appservice_id is NULL + AND users.creation_ts < ? + AND uip.last_seen/1000 > ? + AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 + GROUP BY users.name, platform, users.creation_ts + ) u GROUP BY platform + """ + + results = {} + txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) + + for row in txn: + if row[0] == "unknown": + pass + results[row[0]] = row[1] + + sql = """ + SELECT COALESCE(count(*), 0) FROM ( + SELECT users.name, users.creation_ts * 1000, + MAX(uip.last_seen) + FROM users + INNER JOIN ( + SELECT + user_id, + last_seen + FROM user_ips + ) uip + ON users.name = uip.user_id + AND appservice_id is NULL + AND users.creation_ts < ? + AND uip.last_seen/1000 > ? + AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 + GROUP BY users.name, users.creation_ts + ) u + """ + + txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) + + (count,) = txn.fetchone() + results["all"] = count + + return results + + return await self.db_pool.runInteraction("count_r30_users", _count_r30_users) + + def _get_start_of_day(self): + """ + Returns millisecond unixtime for start of UTC day. + """ + now = time.gmtime() + today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0)) + return today_start * 1000 + + async def generate_user_daily_visits(self) -> None: + """ + Generates daily visit data for use in cohort/ retention analysis + """ + + def _generate_user_daily_visits(txn): + logger.info("Calling _generate_user_daily_visits") + today_start = self._get_start_of_day() + a_day_in_milliseconds = 24 * 60 * 60 * 1000 + now = self._clock.time_msec() + + sql = """ + INSERT INTO user_daily_visits (user_id, device_id, timestamp) + SELECT u.user_id, u.device_id, ? + FROM user_ips AS u + LEFT JOIN ( + SELECT user_id, device_id, timestamp FROM user_daily_visits + WHERE timestamp = ? + ) udv + ON u.user_id = udv.user_id AND u.device_id=udv.device_id + INNER JOIN users ON users.name=u.user_id + WHERE last_seen > ? AND last_seen <= ? + AND udv.timestamp IS NULL AND users.is_guest=0 + AND users.appservice_id IS NULL + GROUP BY u.user_id, u.device_id + """ + + # This means that the day has rolled over but there could still + # be entries from the previous day. There is an edge case + # where if the user logs in at 23:59 and overwrites their + # last_seen at 00:01 then they will not be counted in the + # previous day's stats - it is important that the query is run + # often to minimise this case. + if today_start > self._last_user_visit_update: + yesterday_start = today_start - a_day_in_milliseconds + txn.execute( + sql, + ( + yesterday_start, + yesterday_start, + self._last_user_visit_update, + today_start, + ), + ) + self._last_user_visit_update = today_start + + txn.execute( + sql, (today_start, today_start, self._last_user_visit_update, now) + ) + # Update _last_user_visit_update to now. The reason to do this + # rather just clamping to the beginning of the day is to limit + # the size of the join - meaning that the query can be run more + # frequently + self._last_user_visit_update = now + + await self.db_pool.runInteraction( + "generate_user_daily_visits", _generate_user_daily_visits + ) diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index e0cedd1aacc9..6587696b3d69 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -32,6 +32,9 @@ def __init__(self, database: DatabasePool, db_conn, hs): self._clock = hs.get_clock() self.hs = hs + self._limit_usage_by_mau = hs.config.limit_usage_by_mau + self._max_mau_value = hs.config.max_mau_value + @cached(num_args=0) async def get_monthly_active_count(self) -> int: """Generates current count of monthly active users @@ -117,60 +120,6 @@ async def user_last_seen_monthly_active(self, user_id: str) -> int: desc="user_last_seen_monthly_active", ) - -class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - - self._limit_usage_by_mau = hs.config.limit_usage_by_mau - self._mau_stats_only = hs.config.mau_stats_only - self._max_mau_value = hs.config.max_mau_value - - # Do not add more reserved users than the total allowable number - # cur = LoggingTransaction( - self.db_pool.new_transaction( - db_conn, - "initialise_mau_threepids", - [], - [], - self._initialise_reserved_users, - hs.config.mau_limits_reserved_threepids[: self._max_mau_value], - ) - - def _initialise_reserved_users(self, txn, threepids): - """Ensures that reserved threepids are accounted for in the MAU table, should - be called on start up. - - Args: - txn (cursor): - threepids (list[dict]): List of threepid dicts to reserve - """ - - # XXX what is this function trying to achieve? It upserts into - # monthly_active_users for each *registered* reserved mau user, but why? - # - # - shouldn't there already be an entry for each reserved user (at least - # if they have been active recently)? - # - # - if it's important that the timestamp is kept up to date, why do we only - # run this at startup? - - for tp in threepids: - user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"]) - - if user_id: - is_support = self.is_support_user_txn(txn, user_id) - if not is_support: - # We do this manually here to avoid hitting #6791 - self.db_pool.simple_upsert_txn( - txn, - table="monthly_active_users", - keyvalues={"user_id": user_id}, - values={"timestamp": int(self._clock.time_msec())}, - ) - else: - logger.warning("mau limit reserved threepid %s not found in db" % tp) - async def reap_monthly_active_users(self): """Cleans out monthly active user table to ensure that no stale entries exist. @@ -250,6 +199,58 @@ def _reap_users(txn, reserved_users): "reap_monthly_active_users", _reap_users, reserved_users ) + +class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + self._mau_stats_only = hs.config.mau_stats_only + + # Do not add more reserved users than the total allowable number + # cur = LoggingTransaction( + self.db_pool.new_transaction( + db_conn, + "initialise_mau_threepids", + [], + [], + self._initialise_reserved_users, + hs.config.mau_limits_reserved_threepids[: self._max_mau_value], + ) + + def _initialise_reserved_users(self, txn, threepids): + """Ensures that reserved threepids are accounted for in the MAU table, should + be called on start up. + + Args: + txn (cursor): + threepids (list[dict]): List of threepid dicts to reserve + """ + + # XXX what is this function trying to achieve? It upserts into + # monthly_active_users for each *registered* reserved mau user, but why? + # + # - shouldn't there already be an entry for each reserved user (at least + # if they have been active recently)? + # + # - if it's important that the timestamp is kept up to date, why do we only + # run this at startup? + + for tp in threepids: + user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"]) + + if user_id: + is_support = self.is_support_user_txn(txn, user_id) + if not is_support: + # We do this manually here to avoid hitting #6791 + self.db_pool.simple_upsert_txn( + txn, + table="monthly_active_users", + keyvalues={"user_id": user_id}, + values={"timestamp": int(self._clock.time_msec())}, + ) + else: + logger.warning("mau limit reserved threepid %s not found in db" % tp) + async def upsert_monthly_active_user(self, user_id: str) -> None: """Updates or inserts the user into the monthly active user table, which is used to track the current MAU usage of the server diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index bd6f9553c60c..df69e2c031b8 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -192,6 +192,18 @@ def _count_public_rooms_txn(txn): "count_public_rooms", _count_public_rooms_txn ) + async def get_room_count(self) -> int: + """Retrieve the total number of rooms. + """ + + def f(txn): + sql = "SELECT count(*) FROM rooms" + txn.execute(sql) + row = txn.fetchone() + return row[0] or 0 + + return await self.db_pool.runInteraction("get_rooms", f) + async def get_largest_public_rooms( self, network_tuple: Optional[ThirdPartyInstanceID], @@ -1292,18 +1304,6 @@ def set_room_is_public_appservice_txn(txn, next_id): ) self.hs.get_notifier().on_new_replication_data() - async def get_room_count(self) -> int: - """Retrieve the total number of rooms. - """ - - def f(txn): - sql = "SELECT count(*) FROM rooms" - txn.execute(sql) - row = txn.fetchone() - return row[0] or 0 - - return await self.db_pool.runInteraction("get_rooms", f) - async def add_event_report( self, room_id: str, From 179d8c3a5d03bcaca44691303ae48c0b76aba6c0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 30 Sep 2020 13:35:55 -0400 Subject: [PATCH 19/19] Consolidate logic for starting metrics calls. --- synapse/app/_base.py | 6 ++++++ synapse/app/admin_cmd.py | 1 + synapse/app/generic_worker.py | 6 ------ synapse/app/homeserver.py | 6 ------ 4 files changed, 7 insertions(+), 12 deletions(-) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index fb476ddaf571..8bb0b142ca5b 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -28,6 +28,7 @@ import synapse from synapse.app import check_bind_error +from synapse.app.phone_stats_home import start_phone_stats_home from synapse.config.server import ListenerConfig from synapse.crypto import context_factory from synapse.logging.context import PreserveLoggingContext @@ -274,6 +275,11 @@ def handle_sighup(*args, **kwargs): setup_sentry(hs) setup_sdnotify(hs) + # If background tasks are running on the main process, start collecting the + # phone home stats. + if hs.config.run_background_tasks: + start_phone_stats_home(hs) + # We now freeze all allocated objects in the hopes that (almost) # everything currently allocated are things that will be used for the # rest of time. Doing so means less work each GC (hopefully). diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 7d309b1bb00a..f0d65d08d72d 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -208,6 +208,7 @@ def start(config_options): # Explicitly disable background processes config.update_user_directory = False + config.run_background_tasks = False config.start_pushers = False config.send_federation = False diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 367ec2c57eb3..fc5188ce95ed 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -34,7 +34,6 @@ SERVER_KEY_V2_PREFIX, ) from synapse.app import _base -from synapse.app.phone_stats_home import start_phone_stats_home from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging @@ -978,11 +977,6 @@ def start(config_options): hs.setup() - # If background tasks are running on this worker, start collecting the phone - # home stats. - if hs.config.run_background_tasks: - start_phone_stats_home(hs) - # Ensure the replication streamer is always started in case we write to any # streams. Will no-op if no streams can be written to by this worker. hs.get_replication_streamer() diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 6b57d3024db3..4ed4a2c2533a 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -41,7 +41,6 @@ ) from synapse.app import _base from synapse.app._base import listen_ssl, listen_tcp, quit_with_error -from synapse.app.phone_stats_home import start_phone_stats_home from synapse.config._base import ConfigError from synapse.config.emailconfig import ThreepidBehaviour from synapse.config.homeserver import HomeServerConfig @@ -491,11 +490,6 @@ def profiled(*args, **kargs): ThreadPool._worker = profile(ThreadPool._worker) reactor.run = profile(reactor.run) - # If background tasks are running on the main process, start collecting the - # phone home stats. - if hs.config.run_background_tasks: - start_phone_stats_home(hs) - _base.start_reactor( "synapse-homeserver", soft_file_limit=hs.config.soft_file_limit,