From 6a48d97dc71b76c386ca4600f4a0338ada9b4d26 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 13 Sep 2022 16:44:10 +0100 Subject: [PATCH 01/17] Add experimental config option --- synapse/config/experimental.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 702b81e636c9..f4541a8db03c 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -93,3 +93,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: # MSC3852: Expose last seen user agent field on /_matrix/client/v3/devices. self.msc3852_enabled: bool = experimental.get("msc3852_enabled", False) + + # MSC3881: Remotely toggle push notifications for another client + self.msc3881_enabled: bool = experimental.get("msc3881_enabled", False) From 69bcce03ead5e65931fc3007c3a0c109a121dbbc Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 13 Sep 2022 16:57:47 +0100 Subject: [PATCH 02/17] Database support --- synapse/storage/databases/main/pusher.py | 13 +++++++++---- .../main/delta/72/07add_pusher_enabled.sql | 16 ++++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/main/delta/72/07add_pusher_enabled.sql diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index bd0cfa7f3211..e4f373dfe454 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -119,19 +119,22 @@ async def get_pushers_by(self, keyvalues: Dict[str, Any]) -> Iterator[PusherConf "last_stream_ordering", "last_success", "failing_since", + "enabled", ], desc="get_pushers_by", ) return self._decode_pushers_rows(ret) - async def get_all_pushers(self) -> Iterator[PusherConfig]: - def get_pushers(txn: LoggingTransaction) -> Iterator[PusherConfig]: - txn.execute("SELECT * FROM pushers") + async def get_enabled_pushers(self) -> Iterator[PusherConfig]: + def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]: + txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)") rows = self.db_pool.cursor_to_dict(txn) return self._decode_pushers_rows(rows) - return await self.db_pool.runInteraction("get_all_pushers", get_pushers) + return await self.db_pool.runInteraction( + "get_enabled_pushers", get_enabled_pushers_txn + ) async def get_all_updated_pushers_rows( self, instance_name: str, last_id: int, current_id: int, limit: int @@ -476,6 +479,7 @@ async def add_pusher( data: Optional[JsonDict], last_stream_ordering: int, profile_tag: str = "", + enabled: bool = True, ) -> None: async with self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on @@ -494,6 +498,7 @@ async def add_pusher( "last_stream_ordering": last_stream_ordering, "profile_tag": profile_tag, "id": stream_id, + "enabled": enabled, }, desc="add_pusher", lock=False, diff --git a/synapse/storage/schema/main/delta/72/07add_pusher_enabled.sql b/synapse/storage/schema/main/delta/72/07add_pusher_enabled.sql new file mode 100644 index 000000000000..dba3b4900b59 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/07add_pusher_enabled.sql @@ -0,0 +1,16 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE pushers ADD COLUMN enabled BOOLEAN; \ No newline at end of file From 19180a3f92a857703ea8e96b7cb33f8cd74dfcf4 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 13 Sep 2022 17:05:10 +0100 Subject: [PATCH 03/17] Add support for enabling/disabling pushers in the pusher pool --- synapse/push/__init__.py | 2 ++ synapse/push/pusherpool.py | 50 ++++++++++++++++++++++--------- synapse/replication/tcp/client.py | 10 +++++-- 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 57c4d70466b6..ac99d35a7ea0 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -116,6 +116,7 @@ class PusherConfig: last_stream_ordering: int last_success: Optional[int] failing_since: Optional[int] + enabled: bool def as_dict(self) -> Dict[str, Any]: """Information that can be retrieved about a pusher after creation.""" @@ -128,6 +129,7 @@ def as_dict(self) -> Dict[str, Any]: "lang": self.lang, "profile_tag": self.profile_tag, "pushkey": self.pushkey, + "enabled": self.enabled, } diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 1e0ef44fc786..52b6bdae682e 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -106,6 +106,7 @@ async def add_pusher( lang: Optional[str], data: JsonDict, profile_tag: str = "", + enabled: bool = True, ) -> Optional[Pusher]: """Creates a new pusher and adds it to the pool @@ -147,6 +148,7 @@ async def add_pusher( last_stream_ordering=last_stream_ordering, last_success=None, failing_since=None, + enabled=enabled, ) ) @@ -163,8 +165,9 @@ async def add_pusher( data=data, last_stream_ordering=last_stream_ordering, profile_tag=profile_tag, + enabled=enabled, ) - pusher = await self.start_pusher_by_id(app_id, pushkey, user_id) + pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id) return pusher @@ -276,10 +279,13 @@ async def on_new_receipts( except Exception: logger.exception("Exception in pusher on_new_receipts") - async def start_pusher_by_id( + async def process_pusher_change_by_id( self, app_id: str, pushkey: str, user_id: str ) -> Optional[Pusher]: - """Look up the details for the given pusher, and start it + """Look up the details for the given pusher, and either start it if its + "enabled" flag is True, or try to stop it otherwise. + + If the pusher is new and its "enabled" flag is False, the stop is a noop. Returns: The pusher started, if any @@ -297,6 +303,10 @@ async def start_pusher_by_id( if r.user_name == user_id: pusher_config = r + if pusher_config and not pusher_config.enabled: + self.maybe_stop_pusher(app_id, pushkey, user_id) + return None + pusher = None if pusher_config: pusher = await self._start_pusher(pusher_config) @@ -305,7 +315,7 @@ async def start_pusher_by_id( async def _start_pushers(self) -> None: """Start all the pushers""" - pushers = await self.store.get_all_pushers() + pushers = await self.store.get_enabled_pushers() # Stagger starting up the pushers so we don't completely drown the # process on start up. @@ -363,6 +373,8 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]: synapse_pushers.labels(type(pusher).__name__, pusher.app_id).inc() + logger.info("Starting pusher %s / %s", pusher.user_id, appid_pushkey) + # Check if there *may* be push to process. We do this as this check is a # lot cheaper to do than actually fetching the exact rows we need to # push. @@ -382,16 +394,7 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]: return pusher async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None: - appid_pushkey = "%s:%s" % (app_id, pushkey) - - byuser = self.pushers.get(user_id, {}) - - if appid_pushkey in byuser: - logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) - pusher = byuser.pop(appid_pushkey) - pusher.on_stop() - - synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec() + self.maybe_stop_pusher(app_id, pushkey, user_id) # We can only delete pushers on master. if self._remove_pusher_client: @@ -402,3 +405,22 @@ async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None: await self.store.delete_pusher_by_app_id_pushkey_user_id( app_id, pushkey, user_id ) + + def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None: + """Stops a pusher with the given app ID and push key if one is running. + + Args: + app_id: the pusher's app ID. + pushkey: the pusher's push key. + user_id: the user the pusher belongs to. Only used for logging. + """ + appid_pushkey = "%s:%s" % (app_id, pushkey) + + byuser = self.pushers.get(user_id, {}) + + if appid_pushkey in byuser: + logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) + pusher = byuser.pop(appid_pushkey) + pusher.on_stop() + + synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec() diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e4f2201c922f..cf9cd6833ba9 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -189,7 +189,9 @@ async def on_rdata( if row.deleted: self.stop_pusher(row.user_id, row.app_id, row.pushkey) else: - await self.start_pusher(row.user_id, row.app_id, row.pushkey) + await self.process_pusher_change( + row.user_id, row.app_id, row.pushkey + ) elif stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. @@ -334,13 +336,15 @@ def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None: logger.info("Stopping pusher %r / %r", user_id, key) pusher.on_stop() - async def start_pusher(self, user_id: str, app_id: str, pushkey: str) -> None: + async def process_pusher_change( + self, user_id: str, app_id: str, pushkey: str + ) -> None: if not self._notify_pushers: return key = "%s:%s" % (app_id, pushkey) logger.info("Starting pusher %r / %r", user_id, key) - await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) + await self._pusher_pool.process_pusher_change_by_id(app_id, pushkey, user_id) class FederationSenderHandler: From 8ff18017650c25a044fb0a0f90d628f35a9520f1 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 13 Sep 2022 17:06:32 +0100 Subject: [PATCH 04/17] Add MSC3881 support for pushers endpoints --- synapse/rest/client/pusher.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py index 9a1f10f4befe..48e5d1d017d3 100644 --- a/synapse/rest/client/pusher.py +++ b/synapse/rest/client/pusher.py @@ -42,6 +42,7 @@ def __init__(self, hs: "HomeServer"): super().__init__() self.hs = hs self.auth = hs.get_auth() + self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) @@ -53,6 +54,11 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: filtered_pushers = [p.as_dict() for p in pushers] + for pusher in filtered_pushers: + if self._msc3881_enabled: + pusher["org.matrix.msc3881.enabled"] = pusher["enabled"] + del pusher["enabled"] + return 200, {"pushers": filtered_pushers} @@ -65,6 +71,7 @@ def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() self.notifier = hs.get_notifier() self.pusher_pool = self.hs.get_pusherpool() + self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) @@ -103,6 +110,10 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: if "append" in content: append = content["append"] + enabled = True + if self._msc3881_enabled and "org.matrix.msc3881.enabled" in content: + enabled = content["org.matrix.msc3881.enabled"] + if not append: await self.pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user( app_id=content["app_id"], @@ -122,6 +133,7 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: lang=content["lang"], data=content["data"], profile_tag=content.get("profile_tag", ""), + enabled=enabled, ) except PusherConfigException as pce: raise SynapseError( From ebee86c67d12d71f0e40ac2849a5fc9ebcd4e4f1 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 14 Sep 2022 17:33:19 +0100 Subject: [PATCH 05/17] Tests --- tests/push/test_http.py | 81 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/tests/push/test_http.py b/tests/push/test_http.py index d9c68cdd2d22..8681e0b1a7b3 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -20,7 +20,7 @@ import synapse.rest.admin from synapse.logging.context import make_deferred_yieldable from synapse.push import PusherConfigException -from synapse.rest.client import login, push_rule, receipts, room +from synapse.rest.client import login, push_rule, pusher, receipts, room from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -35,6 +35,7 @@ class HTTPPusherTests(HomeserverTestCase): login.register_servlets, receipts.register_servlets, push_rule.register_servlets, + pusher.register_servlets, ] user_id = True hijack_auth = False @@ -728,11 +729,31 @@ def _send_read_request( ) self.assertEqual(channel.code, 200, channel.json_body) - def _make_user_with_pusher(self, username: str) -> Tuple[str, str]: + def _make_user_with_pusher( + self, username: str, enabled: bool = True + ) -> Tuple[str, str]: + """Registers a user and creates a pusher for them. + + Args: + username: the localpart of the new user's Matrix ID. + enabled: whether to create the pusher in an enabled or disabled state. + """ user_id = self.register_user(username, "pass") access_token = self.login(username, "pass") # Register the pusher + self._set_pusher(user_id, access_token, enabled) + + return user_id, access_token + + def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None: + """Creates or updates the pusher for the given user. + + Args: + user_id: the user's Matrix ID. + access_token: the access token associated with the pusher. + enabled: whether to enable or disable the pusher. + """ user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) @@ -749,11 +770,10 @@ def _make_user_with_pusher(self, username: str) -> Tuple[str, str]: pushkey="a@example.com", lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, + enabled=enabled, ) ) - return user_id, access_token - def test_dont_notify_rule_overrides_message(self) -> None: """ The override push rule will suppress notification @@ -791,3 +811,56 @@ def test_dont_notify_rule_overrides_message(self) -> None: # The user sends a message back (sends a notification) self.helper.send(room, body="Hello", tok=access_token) self.assertEqual(len(self.push_attempts), 1) + + @override_config({"experimental_features": {"msc3881_enabled": True}}) + def test_disable(self) -> None: + """Tests that disabling a pusher means it's not pushed to anymore.""" + user_id, access_token = self._make_user_with_pusher("user") + other_user_id, other_access_token = self._make_user_with_pusher("otheruser") + + room = self.helper.create_room_as(user_id, tok=access_token) + self.helper.join(room=room, user=other_user_id, tok=other_access_token) + + # Send a message and check that it generated a push. + self.helper.send(room, body="Hi!", tok=other_access_token) + self.assertEqual(len(self.push_attempts), 1) + + # Disable the pusher. + self._set_pusher(user_id, access_token, enabled=False) + + # Send another message and check that it did not generate a push. + self.helper.send(room, body="Hi!", tok=other_access_token) + self.assertEqual(len(self.push_attempts), 1) + + # Get the pushers for the user and check that it is marked as disabled. + channel = self.make_request("GET", "/pushers", access_token=access_token) + self.assertEqual(channel.code, 200) + self.assertEqual(len(channel.json_body["pushers"]), 1) + self.assertFalse(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"]) + + @override_config({"experimental_features": {"msc3881_enabled": True}}) + def test_enable(self) -> None: + """Tests that enabling a disabled pusher means it gets pushed to.""" + # Create the user with the pusher already disabled. + user_id, access_token = self._make_user_with_pusher("user", enabled=False) + other_user_id, other_access_token = self._make_user_with_pusher("otheruser") + + room = self.helper.create_room_as(user_id, tok=access_token) + self.helper.join(room=room, user=other_user_id, tok=other_access_token) + + # Send a message and check that it did not generate a push. + self.helper.send(room, body="Hi!", tok=other_access_token) + self.assertEqual(len(self.push_attempts), 0) + + # Enable the pusher. + self._set_pusher(user_id, access_token, enabled=True) + + # Send another message and check that it did generate a push. + self.helper.send(room, body="Hi!", tok=other_access_token) + self.assertEqual(len(self.push_attempts), 1) + + # Get the pushers for the user and check that it is marked as enabled. + channel = self.make_request("GET", "/pushers", access_token=access_token) + self.assertEqual(channel.code, 200) + self.assertEqual(len(channel.json_body["pushers"]), 1) + self.assertFalse(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"]) From 9c72ec5375d3f80832dbd114490f9f090b257d53 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 14 Sep 2022 17:37:43 +0100 Subject: [PATCH 06/17] Changelog --- changelog.d/13799.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13799.feature diff --git a/changelog.d/13799.feature b/changelog.d/13799.feature new file mode 100644 index 000000000000..ea341f0bec06 --- /dev/null +++ b/changelog.d/13799.feature @@ -0,0 +1 @@ +Add experimental support for enabling or disabling individual pushers (as a partial implementation of [MSC3881](https://github.com/matrix-org/matrix-spec-proposals/pull/3881)). From fa5c959cfb42146d9a090af2cd932161418b5298 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 15 Sep 2022 10:22:54 +0100 Subject: [PATCH 07/17] Always send the enabled field as a boolean --- synapse/rest/client/pusher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py index 48e5d1d017d3..a5092604443c 100644 --- a/synapse/rest/client/pusher.py +++ b/synapse/rest/client/pusher.py @@ -56,7 +56,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: for pusher in filtered_pushers: if self._msc3881_enabled: - pusher["org.matrix.msc3881.enabled"] = pusher["enabled"] + pusher["org.matrix.msc3881.enabled"] = bool(pusher["enabled"]) del pusher["enabled"] return 200, {"pushers": filtered_pushers} From 4518fb385167f037cc51831bbc46158000e60824 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 15 Sep 2022 10:23:08 +0100 Subject: [PATCH 08/17] Fix test --- tests/push/test_http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/push/test_http.py b/tests/push/test_http.py index 8681e0b1a7b3..194a7b081f83 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -863,4 +863,4 @@ def test_enable(self) -> None: channel = self.make_request("GET", "/pushers", access_token=access_token) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) - self.assertFalse(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"]) + self.assertTrue(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"]) From 7f19d5beebf6a9d386acd87f1f5c28850f6fca40 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 15 Sep 2022 10:28:00 +0100 Subject: [PATCH 09/17] Move db delta to right schema version --- .../{72/07add_pusher_enabled.sql => 73/02add_pusher_enabled.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename synapse/storage/schema/main/delta/{72/07add_pusher_enabled.sql => 73/02add_pusher_enabled.sql} (100%) diff --git a/synapse/storage/schema/main/delta/72/07add_pusher_enabled.sql b/synapse/storage/schema/main/delta/73/02add_pusher_enabled.sql similarity index 100% rename from synapse/storage/schema/main/delta/72/07add_pusher_enabled.sql rename to synapse/storage/schema/main/delta/73/02add_pusher_enabled.sql From 2799644c4ab98d2710410e89d52e2c29c9e76d35 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 16 Sep 2022 16:09:14 +0100 Subject: [PATCH 10/17] Don't overwrite the access token if we're updating an existing pusher --- synapse/push/pusherpool.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 52b6bdae682e..b3024826a954 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -152,6 +152,16 @@ async def add_pusher( ) ) + # Before we actually create the pusher, we check if the user already has one for + # this app ID and pushkey. If so, we want to keep the access token in place, + # since this could be one device modifying (e.g. enabling/disabling) another + # device's pusher. + existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( + user_id, app_id, pushkey + ) + if existing_config: + access_token = existing_config.access_token + await self.store.add_pusher( user_id=user_id, access_token=access_token, @@ -279,6 +289,18 @@ async def on_new_receipts( except Exception: logger.exception("Exception in pusher on_new_receipts") + async def _get_pusher_config_for_user_by_app_id_and_pushkey( + self, user_id: str, app_id: str, pushkey: str + ) -> Optional[PusherConfig]: + resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey) + + pusher_config = None + for r in resultlist: + if r.user_name == user_id: + pusher_config = r + + return pusher_config + async def process_pusher_change_by_id( self, app_id: str, pushkey: str, user_id: str ) -> Optional[Pusher]: @@ -296,12 +318,9 @@ async def process_pusher_change_by_id( if not self._pusher_shard_config.should_handle(self._instance_name, user_id): return None - resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey) - - pusher_config = None - for r in resultlist: - if r.user_name == user_id: - pusher_config = r + pusher_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( + user_id, app_id, pushkey + ) if pusher_config and not pusher_config.enabled: self.maybe_stop_pusher(app_id, pushkey, user_id) From e28d0081ddb75fc3bc7e841ecd09916b850be115 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 16 Sep 2022 17:16:49 +0100 Subject: [PATCH 11/17] Make sure pushers with no value for 'enabled' are considered enabled Also move the cast to bool to a more logical location --- synapse/rest/client/pusher.py | 2 +- synapse/storage/databases/main/pusher.py | 13 +++++++++++++ tests/push/test_http.py | 22 +++++++++++++++++++++- 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py index a5092604443c..48e5d1d017d3 100644 --- a/synapse/rest/client/pusher.py +++ b/synapse/rest/client/pusher.py @@ -56,7 +56,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: for pusher in filtered_pushers: if self._msc3881_enabled: - pusher["org.matrix.msc3881.enabled"] = bool(pusher["enabled"]) + pusher["org.matrix.msc3881.enabled"] = pusher["enabled"] del pusher["enabled"] return 200, {"pushers": filtered_pushers} diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index e4f373dfe454..199d8f7cdb2c 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -89,6 +89,19 @@ def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: ) continue + # Pushers created while support for the 'enabled' field is not active + # (either because they were created before said support existed or because + # they were created while the experimental implementation is turned off) + # will have the 'enabled' column set to NULL, which needs to be interpreted + # as True. + if r["enabled"] is None: + r["enabled"] = True + + # If we're using SQLite, then boolean values are integers. This is + # troublesome since some code using the return value of this method might + # expect it to be a boolean, or will expose it to clients (in responses). + r["enabled"] = bool(r["enabled"]) + yield PusherConfig(**r) async def get_pushers_by_app_id_and_pushkey( diff --git a/tests/push/test_http.py b/tests/push/test_http.py index 194a7b081f83..790ab464edbf 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -836,7 +836,10 @@ def test_disable(self) -> None: channel = self.make_request("GET", "/pushers", access_token=access_token) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) - self.assertFalse(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"]) + + enabled = channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"] + self.assertFalse(enabled) + self.assertTrue(isinstance(enabled, bool)) @override_config({"experimental_features": {"msc3881_enabled": True}}) def test_enable(self) -> None: @@ -860,6 +863,23 @@ def test_enable(self) -> None: self.assertEqual(len(self.push_attempts), 1) # Get the pushers for the user and check that it is marked as enabled. + channel = self.make_request("GET", "/pushers", access_token=access_token) + self.assertEqual(channel.code, 200) + self.assertEqual(len(channel.json_body["pushers"]), 1) + + enabled = channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"] + self.assertTrue(enabled) + self.assertTrue(isinstance(enabled, bool)) + + @override_config({"experimental_features": {"msc3881_enabled": True}}) + def test_null_enabled(self) -> None: + """Tests that a pusher that has an 'enabled' column set to NULL (eg pushers + created before the column was introduced) is considered enabled. + """ + # We intentionally set 'enabled' to None so that it's stored as NULL in the + # database. + user_id, access_token = self._make_user_with_pusher("user", enabled=None) # type: ignore[arg-type] + channel = self.make_request("GET", "/pushers", access_token=access_token) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) From 0f021734460a5eb067b62aa9ded82dc01e55a410 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 16 Sep 2022 17:17:07 +0100 Subject: [PATCH 12/17] Add the column to port_db --- synapse/_scripts/synapse_port_db.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 30983c47fbb7..450ba462ba11 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -111,6 +111,7 @@ "e2e_fallback_keys_json": ["used"], "access_tokens": ["used"], "device_lists_changes_in_room": ["converted_to_destinations"], + "pushers": ["enabled"], } From effda598ab642df7562ae5fd8fb130e39f12b18d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 16 Sep 2022 17:17:48 +0100 Subject: [PATCH 13/17] Renamed confusing variable --- synapse/rest/client/pusher.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py index 48e5d1d017d3..14638e7c74f2 100644 --- a/synapse/rest/client/pusher.py +++ b/synapse/rest/client/pusher.py @@ -52,14 +52,14 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: user.to_string() ) - filtered_pushers = [p.as_dict() for p in pushers] + pusher_dicts = [p.as_dict() for p in pushers] - for pusher in filtered_pushers: + for pusher in pusher_dicts: if self._msc3881_enabled: pusher["org.matrix.msc3881.enabled"] = pusher["enabled"] del pusher["enabled"] - return 200, {"pushers": filtered_pushers} + return 200, {"pushers": pusher_dicts} class PushersSetRestServlet(RestServlet): From abbb8fcd990c1ef9c08c48fc66f88640502b5c3b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 20 Sep 2022 11:44:47 +0100 Subject: [PATCH 14/17] Inline get_pushers_by That way we can add a COALESCE in the query --- synapse/storage/databases/main/pusher.py | 60 ++++++++++++------------ 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 199d8f7cdb2c..7860b973968e 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -89,14 +89,6 @@ def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: ) continue - # Pushers created while support for the 'enabled' field is not active - # (either because they were created before said support existed or because - # they were created while the experimental implementation is turned off) - # will have the 'enabled' column set to NULL, which needs to be interpreted - # as True. - if r["enabled"] is None: - r["enabled"] = True - # If we're using SQLite, then boolean values are integers. This is # troublesome since some code using the return value of this method might # expect it to be a boolean, or will expose it to clients (in responses). @@ -113,29 +105,39 @@ async def get_pushers_by_user_id(self, user_id: str) -> Iterator[PusherConfig]: return await self.get_pushers_by({"user_name": user_id}) async def get_pushers_by(self, keyvalues: Dict[str, Any]) -> Iterator[PusherConfig]: - ret = await self.db_pool.simple_select_list( - "pushers", - keyvalues, - [ - "id", - "user_name", - "access_token", - "profile_tag", - "kind", - "app_id", - "app_display_name", - "device_display_name", - "pushkey", - "ts", - "lang", - "data", - "last_stream_ordering", - "last_success", - "failing_since", - "enabled", - ], + """Retrieve pushers that match the given criteria. + + Args: + keyvalues: A {column: value} dictionary. + + Returns: + The pushers for which the given columns have the given values. + """ + def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + # We could technically use simple_select_list here, but we need to call + # COALESCE on the 'enabled' column. While it is technically possible to give + # simple_select_list the whole `COALESCE(...) AS ...` as a column name, it + # feels a bit hacky, so it's probably better to just inline the query. + sql = """ + SELECT + id, user_name, access_token, profile_tag, kind, app_id, + app_display_name, device_display_name, pushkey, ts, lang, data, + last_stream_ordering, last_success, failing_since, + COALESCE(enabled, TRUE) AS enabled + FROM pushers + """ + + sql += "WHERE %s" % (" AND ".join("%s = ?" % (k,) for k in keyvalues),) + + txn.execute(sql, list(keyvalues.values())) + + return self.db_pool.cursor_to_dict(txn) + + ret = await self.db_pool.runInteraction( desc="get_pushers_by", + func=get_pushers_by_txn, ) + return self._decode_pushers_rows(ret) async def get_enabled_pushers(self) -> Iterator[PusherConfig]: From c5e779ed9412ebe3f96fe2b3be793b4c5e55339f Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 20 Sep 2022 12:03:50 +0100 Subject: [PATCH 15/17] Rename add_pusher and add a test to ensure the access token does not change --- synapse/handlers/register.py | 4 +- synapse/push/pusherpool.py | 2 +- synapse/rest/admin/users.py | 4 +- synapse/rest/client/pusher.py | 2 +- synapse/storage/databases/main/pusher.py | 1 + tests/push/test_email.py | 4 +- tests/push/test_http.py | 47 +++++++++++++++++++----- tests/replication/test_pusher_shard.py | 2 +- tests/rest/admin/test_user.py | 2 +- 9 files changed, 49 insertions(+), 19 deletions(-) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 20ec22105a34..cfcadb34db85 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -997,7 +997,7 @@ async def _register_email_threepid( assert user_tuple token_id = user_tuple.token_id - await self.pusher_pool.add_pusher( + await self.pusher_pool.add_or_update_pusher( user_id=user_id, access_token=token_id, kind="email", @@ -1005,7 +1005,7 @@ async def _register_email_threepid( app_display_name="Email Notifications", device_display_name=threepid["address"], pushkey=threepid["address"], - lang=None, # We don't know a user's language here + lang=None, data={}, ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index b3024826a954..7156152dd8c4 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -94,7 +94,7 @@ def start(self) -> None: return run_as_background_process("start_pushers", self._start_pushers) - async def add_pusher( + async def add_or_update_pusher( self, user_id: str, access_token: Optional[int], diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 78ee9b6532b6..6aed5f78dbde 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -375,7 +375,7 @@ async def on_PUT( and self.hs.config.email.email_notif_for_new_users and medium == "email" ): - await self.pusher_pool.add_pusher( + await self.pusher_pool.add_or_update_pusher( user_id=user_id, access_token=None, kind="email", @@ -383,7 +383,7 @@ async def on_PUT( app_display_name="Email Notifications", device_display_name=address, pushkey=address, - lang=None, # We don't know a user's language here + lang=None, data={}, ) diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py index 14638e7c74f2..c9f76125dc87 100644 --- a/synapse/rest/client/pusher.py +++ b/synapse/rest/client/pusher.py @@ -122,7 +122,7 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: ) try: - await self.pusher_pool.add_pusher( + await self.pusher_pool.add_or_update_pusher( user_id=user.to_string(), access_token=requester.access_token_id, kind=content["kind"], diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 7860b973968e..ee55b8c4a93e 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -113,6 +113,7 @@ async def get_pushers_by(self, keyvalues: Dict[str, Any]) -> Iterator[PusherConf Returns: The pushers for which the given columns have the given values. """ + def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: # We could technically use simple_select_list here, but we need to call # COALESCE on the 'enabled' column. While it is technically possible to give diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 7a3b0d675592..fd14568f55ce 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -114,7 +114,7 @@ def prepare(self, reactor, clock, hs): ) self.pusher = self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=self.user_id, access_token=self.token_id, kind="email", @@ -136,7 +136,7 @@ def test_need_validated_email(self): """ with self.assertRaises(SynapseError) as cm: self.get_success_or_raise( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=self.user_id, access_token=self.token_id, kind="email", diff --git a/tests/push/test_http.py b/tests/push/test_http.py index 790ab464edbf..af67d8446349 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -19,7 +19,7 @@ import synapse.rest.admin from synapse.logging.context import make_deferred_yieldable -from synapse.push import PusherConfigException +from synapse.push import PusherConfig, PusherConfigException from synapse.rest.client import login, push_rule, pusher, receipts, room from synapse.server import HomeServer from synapse.types import JsonDict @@ -75,7 +75,7 @@ def test_invalid_configuration(self) -> None: def test_data(data: Optional[JsonDict]) -> None: self.get_failure( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, access_token=token_id, kind="http", @@ -120,7 +120,7 @@ def test_sends_http(self) -> None: token_id = user_tuple.token_id self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, access_token=token_id, kind="http", @@ -236,7 +236,7 @@ def test_sends_high_priority_for_encrypted(self) -> None: token_id = user_tuple.token_id self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, access_token=token_id, kind="http", @@ -356,7 +356,7 @@ def test_sends_high_priority_for_one_to_one_only(self) -> None: token_id = user_tuple.token_id self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, access_token=token_id, kind="http", @@ -442,7 +442,7 @@ def test_sends_high_priority_for_mention(self) -> None: token_id = user_tuple.token_id self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, access_token=token_id, kind="http", @@ -519,7 +519,7 @@ def test_sends_high_priority_for_atroom(self) -> None: token_id = user_tuple.token_id self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, access_token=token_id, kind="http", @@ -625,7 +625,7 @@ def _test_push_unread_count(self) -> None: token_id = user_tuple.token_id self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, access_token=token_id, kind="http", @@ -760,7 +760,7 @@ def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None: token_id = user_tuple.token_id self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, access_token=token_id, kind="http", @@ -884,3 +884,32 @@ def test_null_enabled(self) -> None: self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) self.assertTrue(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"]) + + def test_update_different_device_access_token(self) -> None: + """Tests that if we create a pusher from one device, the update it from another + device, the access token associated with the pusher stays the same. + """ + # Create a user with a pusher. + user_id, access_token = self._make_user_with_pusher("user") + + # Get the token ID for the current access token, since that's what we store in + # the pushers table. + user_tuple = self.get_success( + self.hs.get_datastores().main.get_user_by_access_token(access_token) + ) + token_id = user_tuple.token_id + + # Generate a new access token, and update the pusher with it. + new_token = self.login("user", "pass") + self._set_pusher(user_id, new_token, enabled=False) + + # Get the current list of pushers for the user. + ret = self.get_success( + self.hs.get_datastores().main.get_pushers_by({"user_name": user_id}) + ) + pushers: List[PusherConfig] = list(ret) + + # Check that we still have one pusher, and that the access token associated with + # it didn't change. + self.assertEqual(len(pushers), 1) + self.assertEqual(pushers[0].access_token, token_id) diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py index 8f4f6688ce86..59fea93e490e 100644 --- a/tests/replication/test_pusher_shard.py +++ b/tests/replication/test_pusher_shard.py @@ -55,7 +55,7 @@ def _create_pusher_and_send_msg(self, localpart): token_id = user_dict.token_id self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, access_token=token_id, kind="http", diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index ec5ccf6fcad0..7ed1bc5de7e7 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -2839,7 +2839,7 @@ def test_get_pushers(self) -> None: token_id = user_tuple.token_id self.get_success( - self.hs.get_pusherpool().add_pusher( + self.hs.get_pusherpool().add_or_update_pusher( user_id=self.other_user, access_token=token_id, kind="http", From 4d6a70acd992dfe50857d9ba0465f95c48334da2 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 21 Sep 2022 14:54:39 +0100 Subject: [PATCH 16/17] Rephrase comment --- synapse/push/pusherpool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 7156152dd8c4..2597898cf48f 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -152,8 +152,8 @@ async def add_or_update_pusher( ) ) - # Before we actually create the pusher, we check if the user already has one for - # this app ID and pushkey. If so, we want to keep the access token in place, + # Before we actually persist the pusher, we check if the user already has one + # for this app ID and pushkey. If so, we want to keep the access token in place, # since this could be one device modifying (e.g. enabling/disabling) another # device's pusher. existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( From 8a55286657a7141db0a8d989dc6f2113958f2187 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 21 Sep 2022 15:04:47 +0100 Subject: [PATCH 17/17] Standardise changelog So we can merge it with the other PR --- changelog.d/13799.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/13799.feature b/changelog.d/13799.feature index ea341f0bec06..6c8e5cffe281 100644 --- a/changelog.d/13799.feature +++ b/changelog.d/13799.feature @@ -1 +1 @@ -Add experimental support for enabling or disabling individual pushers (as a partial implementation of [MSC3881](https://github.com/matrix-org/matrix-spec-proposals/pull/3881)). +Add experimental support for [MSC3881: Remotely toggle push notifications for another client](https://github.com/matrix-org/matrix-spec-proposals/pull/3881).