From 934e61e5230c74447a58bd457f8bde1faf922e1b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Dec 2020 09:02:45 -0500 Subject: [PATCH 1/5] Remove assertions and update type hints. --- changelog.d/8943.misc | 1 + synapse/push/emailpusher.py | 1 - synapse/push/httppusher.py | 5 +---- synapse/storage/databases/main/event_push_actions.py | 4 ++-- 4 files changed, 4 insertions(+), 7 deletions(-) create mode 100644 changelog.d/8943.misc diff --git a/changelog.d/8943.misc b/changelog.d/8943.misc new file mode 100644 index 000000000000..4ff0b94b9496 --- /dev/null +++ b/changelog.d/8943.misc @@ -0,0 +1 @@ +Add type hints to push module. diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 64a35c199422..820ee30c5937 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -172,7 +172,6 @@ async def _unsafe_process(self) -> None: being run. """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - assert self.max_stream_ordering is not None unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email( self.user_id, start, self.max_stream_ordering ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 5408aa12957b..daa50ba430b5 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -192,10 +192,7 @@ async def _unsafe_process(self) -> None: Never call this directly: use _process which will only allow this to run once per pusher. """ - - fn = self.store.get_unread_push_actions_for_user_in_range_for_http - assert self.max_stream_ordering is not None - unprocessed = await fn( + unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http( self.user_id, self.last_stream_ordering, self.max_stream_ordering ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 2e56dfaf312a..2b10d39f442c 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -209,7 +209,7 @@ async def get_unread_push_actions_for_user_in_range_for_http( self, user_id: str, min_stream_ordering: int, - max_stream_ordering: int, + max_stream_ordering: Optional[int], limit: int = 20, ) -> List[dict]: """Get a list of the most recent unread push actions for a given user, @@ -314,7 +314,7 @@ async def get_unread_push_actions_for_user_in_range_for_email( self, user_id: str, min_stream_ordering: int, - max_stream_ordering: int, + max_stream_ordering: Optional[int], limit: int = 20, ) -> List[dict]: """Get a list of the most recent unread push actions for a given user, From f3b9b221ac0a712265420f4c29e1bd7fa698e422 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Dec 2020 09:37:30 -0500 Subject: [PATCH 2/5] Fetch the stream ordering during initialization. --- synapse/push/__init__.py | 5 ++--- synapse/push/emailpusher.py | 9 +++------ synapse/push/httppusher.py | 2 +- synapse/storage/databases/main/event_push_actions.py | 4 ++-- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 3d2e8748381d..e0c75f20d626 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -36,9 +36,8 @@ def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any]): # This is the highest stream ordering we know it's safe to process. # When new events arrive, we'll be given a window of new events: we # should honour this rather than just looking for anything higher - # because of potential out-of-order event serialisation. This starts - # off as None though as we don't know any better. - self.max_stream_ordering = None # type: Optional[int] + # because of potential out-of-order event serialisation. + self.max_stream_ordering = self.store.get_room_max_stream_ordering() @abc.abstractmethod def on_new_notifications(self, max_token: RoomStreamToken) -> None: diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 820ee30c5937..7efdb6802aa2 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -99,12 +99,9 @@ def on_new_notifications(self, max_token: RoomStreamToken) -> None: # clock components. max_stream_ordering = max_token.stream - if self.max_stream_ordering: - self.max_stream_ordering = max( - max_stream_ordering, self.max_stream_ordering - ) - else: - self.max_stream_ordering = max_stream_ordering + self.max_stream_ordering = max( + max_stream_ordering, self.max_stream_ordering + ) self._start_processing() def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index daa50ba430b5..69441bd16233 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -129,7 +129,7 @@ def on_new_notifications(self, max_token: RoomStreamToken) -> None: max_stream_ordering = max_token.stream self.max_stream_ordering = max( - max_stream_ordering, self.max_stream_ordering or 0 + max_stream_ordering, self.max_stream_ordering ) self._start_processing() diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 2b10d39f442c..2e56dfaf312a 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -209,7 +209,7 @@ async def get_unread_push_actions_for_user_in_range_for_http( self, user_id: str, min_stream_ordering: int, - max_stream_ordering: Optional[int], + max_stream_ordering: int, limit: int = 20, ) -> List[dict]: """Get a list of the most recent unread push actions for a given user, @@ -314,7 +314,7 @@ async def get_unread_push_actions_for_user_in_range_for_email( self, user_id: str, min_stream_ordering: int, - max_stream_ordering: Optional[int], + max_stream_ordering: int, limit: int = 20, ) -> List[dict]: """Get a list of the most recent unread push actions for a given user, From 903d9a8fc93c6d526bbfc56c970abc9b9f9a607a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Dec 2020 09:40:09 -0500 Subject: [PATCH 3/5] Abstract shared code. --- synapse/push/__init__.py | 14 +++++++++++++- synapse/push/emailpusher.py | 11 ----------- synapse/push/httppusher.py | 11 ----------- 3 files changed, 13 insertions(+), 23 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index e0c75f20d626..e70276d47627 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -39,8 +39,20 @@ def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any]): # because of potential out-of-order event serialisation. self.max_stream_ordering = self.store.get_room_max_stream_ordering() - @abc.abstractmethod def on_new_notifications(self, max_token: RoomStreamToken) -> None: + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + + self.max_stream_ordering = max( + max_stream_ordering, self.max_stream_ordering + ) + self._start_processing() + + @abc.abstractmethod + def _start_processing(self): + """Start processing push notifications.""" raise NotImplementedError() @abc.abstractmethod diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 7efdb6802aa2..0bf4f610d4d8 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -93,17 +93,6 @@ def on_stop(self) -> None: pass self.timed_call = None - def on_new_notifications(self, max_token: RoomStreamToken) -> None: - # We just use the minimum stream ordering and ignore the vector clock - # component. This is safe to do as long as we *always* ignore the vector - # clock components. - max_stream_ordering = max_token.stream - - self.max_stream_ordering = max( - max_stream_ordering, self.max_stream_ordering - ) - self._start_processing() - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: # We could wake up and cancel the timer but there tend to be quite a # lot of read receipts so it's probably less work to just let the diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 69441bd16233..9b3039473bae 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -122,17 +122,6 @@ def on_started(self, should_check_for_notifs: bool) -> None: if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, max_token: RoomStreamToken) -> None: - # We just use the minimum stream ordering and ignore the vector clock - # component. This is safe to do as long as we *always* ignore the vector - # clock components. - max_stream_ordering = max_token.stream - - self.max_stream_ordering = max( - max_stream_ordering, self.max_stream_ordering - ) - self._start_processing() - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: # Note that the min here shouldn't be relied upon to be accurate. From 8124f55868311e2b6d6e50acf90f69ae43e402f2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Dec 2020 09:46:03 -0500 Subject: [PATCH 4/5] lint. --- synapse/push/__init__.py | 6 ++---- synapse/push/emailpusher.py | 1 - synapse/push/httppusher.py | 1 - 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index e70276d47627..ad07ee86f6fd 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. import abc -from typing import TYPE_CHECKING, Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict from synapse.types import RoomStreamToken @@ -45,9 +45,7 @@ def on_new_notifications(self, max_token: RoomStreamToken) -> None: # clock components. max_stream_ordering = max_token.stream - self.max_stream_ordering = max( - max_stream_ordering, self.max_stream_ordering - ) + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) self._start_processing() @abc.abstractmethod diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 0bf4f610d4d8..11a97b8df439 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -22,7 +22,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import Pusher from synapse.push.mailer import Mailer -from synapse.types import RoomStreamToken if TYPE_CHECKING: from synapse.app.homeserver import HomeServer diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 9b3039473bae..e8b25bcd2a44 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -26,7 +26,6 @@ from synapse.logging import opentracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import Pusher, PusherConfigException -from synapse.types import RoomStreamToken from . import push_rule_evaluator, push_tools From fddb5b18337ce8405cd4d7efc2521d9b694508c6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Dec 2020 09:52:26 -0500 Subject: [PATCH 5/5] Remove a broken implementation of getting the proper stream ordering. --- synapse/push/pusherpool.py | 5 ++--- synapse/storage/databases/main/event_push_actions.py | 10 ---------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 9fcc0b8a6454..9c12d81cfbf5 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -129,9 +129,8 @@ async def add_pusher( ) # create the pusher setting last_stream_ordering to the current maximum - # stream ordering in event_push_actions, so it will process - # pushes from this point onwards. - last_stream_ordering = await self.store.get_latest_push_action_stream_ordering() + # stream ordering, so it will process pushes from this point onwards. + last_stream_ordering = self.store.get_room_max_stream_ordering() await self.store.add_pusher( user_id=user_id, diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 2e56dfaf312a..e5c03cc60969 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -894,16 +894,6 @@ def f(txn): pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"]) return push_actions - async def get_latest_push_action_stream_ordering(self): - def f(txn): - txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") - return txn.fetchone() - - result = await self.db_pool.runInteraction( - "get_latest_push_action_stream_ordering", f - ) - return result[0] or 0 - def _remove_old_push_actions_before_txn( self, txn, room_id, user_id, stream_ordering ):