diff --git a/changelog.d/8287.bugfix b/changelog.d/8287.bugfix new file mode 100644 index 000000000000..839781aa0753 --- /dev/null +++ b/changelog.d/8287.bugfix @@ -0,0 +1 @@ +Fix edge case where push could get delayed for a user until a later event was pushed. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 43f2986f8955..74d7ac8a67f2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2970,7 +2970,7 @@ async def _notify_persisted_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) + await self.pusher_pool.on_new_notifications(max_stream_id) async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8a7b4916cd6a..d1556659e3e5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1145,7 +1145,7 @@ def is_inviter_member_event(e): # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) + await self.pusher_pool.on_new_notifications(max_stream_id) def _notify(): try: diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index b7ea4438e082..28bd8ab7481e 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -91,7 +91,7 @@ def on_stop(self): pass self.timed_call = None - def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + def on_new_notifications(self, max_stream_ordering): if self.max_stream_ordering: self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index f21fa9b65905..26706bf3e1ee 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -114,7 +114,7 @@ def on_started(self, should_check_for_notifs): if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + def on_new_notifications(self, max_stream_ordering): self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering or 0 ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 3c3262a88c53..fa8473bf8d00 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -64,6 +64,12 @@ def __init__(self, hs: "HomeServer"): self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() + # Record the last stream ID that we were poked about so we can get + # changes since then. We set this to the current max stream ID on + # startup as every individual pusher will have checked for changes on + # startup. + self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering() + # map from user id to app_id:pushkey to pusher self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] @@ -178,20 +184,27 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens): ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, min_stream_id, max_stream_id): + async def on_new_notifications(self, max_stream_id): if not self.pushers: # nothing to do here. return + if max_stream_id < self._last_room_stream_id_seen: + # Nothing to do + return + + prev_stream_id = self._last_room_stream_id_seen + self._last_room_stream_id_seen = max_stream_id + try: users_affected = await self.store.get_push_action_users_in_range( - min_stream_id, max_stream_id + prev_stream_id, max_stream_id ) for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(min_stream_id, max_stream_id) + p.on_new_notifications(max_stream_id) except Exception: logger.exception("Exception in pusher on_new_notifications") diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index d6ecf5b32703..ccd3147dfdaf 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -154,7 +154,8 @@ async def on_rdata( max_token = self.store.get_room_max_stream_ordering() self.notifier.on_new_room_event(event, token, max_token, extra_users) - await self.pusher_pool.on_new_notifications(token, token) + max_token = self.store.get_room_max_stream_ordering() + await self.pusher_pool.on_new_notifications(max_token) # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index ae6bc24f4cbb..f306a09bfaa7 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -80,6 +80,7 @@ def make_homeserver(self, reactor, clock): "get_user_directory_stream_pos", "get_current_state_deltas", "get_device_updates_by_remote", + "get_room_max_stream_ordering", ] )