Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix handling of stream tokens for push #8943

Merged
merged 5 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8943.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type hints to push module.
19 changes: 14 additions & 5 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import abc
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict

from synapse.types import RoomStreamToken

Expand All @@ -36,12 +36,21 @@ def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any]):
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
# should honour this rather than just looking for anything higher
# because of potential out-of-order event serialisation. This starts
# off as None though as we don't know any better.
self.max_stream_ordering = None # type: Optional[int]
# because of potential out-of-order event serialisation.
self.max_stream_ordering = self.store.get_room_max_stream_ordering()

@abc.abstractmethod
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
self._start_processing()

@abc.abstractmethod
def _start_processing(self):
"""Start processing push notifications."""
raise NotImplementedError()

@abc.abstractmethod
Expand Down
16 changes: 0 additions & 16 deletions synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher
from synapse.push.mailer import Mailer
from synapse.types import RoomStreamToken

if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer
Expand Down Expand Up @@ -93,20 +92,6 @@ def on_stop(self) -> None:
pass
self.timed_call = None

def on_new_notifications(self, max_token: RoomStreamToken) -> None:
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
)
else:
self.max_stream_ordering = max_stream_ordering
self._start_processing()

def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
# We could wake up and cancel the timer but there tend to be quite a
# lot of read receipts so it's probably less work to just let the
Expand Down Expand Up @@ -172,7 +157,6 @@ async def _unsafe_process(self) -> None:
being run.
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
assert self.max_stream_ordering is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
self.user_id, start, self.max_stream_ordering
)
Expand Down
17 changes: 1 addition & 16 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfigException
from synapse.types import RoomStreamToken

from . import push_rule_evaluator, push_tools

Expand Down Expand Up @@ -122,17 +121,6 @@ def on_started(self, should_check_for_notifs: bool) -> None:
if should_check_for_notifs:
self._start_processing()

def on_new_notifications(self, max_token: RoomStreamToken) -> None:
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
self._start_processing()

def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
# Note that the min here shouldn't be relied upon to be accurate.

Expand Down Expand Up @@ -192,10 +180,7 @@ async def _unsafe_process(self) -> None:
Never call this directly: use _process which will only allow this to
run once per pusher.
"""

fn = self.store.get_unread_push_actions_for_user_in_range_for_http
assert self.max_stream_ordering is not None
unprocessed = await fn(
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)

Expand Down
5 changes: 2 additions & 3 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,8 @@ async def add_pusher(
)

# create the pusher setting last_stream_ordering to the current maximum
# stream ordering in event_push_actions, so it will process
# pushes from this point onwards.
last_stream_ordering = await self.store.get_latest_push_action_stream_ordering()
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()

await self.store.add_pusher(
user_id=user_id,
Expand Down
10 changes: 0 additions & 10 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,16 +894,6 @@ def f(txn):
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
return push_actions

async def get_latest_push_action_stream_ordering(self):
def f(txn):
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
return txn.fetchone()

result = await self.db_pool.runInteraction(
"get_latest_push_action_stream_ordering", f
)
return result[0] or 0

def _remove_old_push_actions_before_txn(
self, txn, room_id, user_id, stream_ordering
):
Expand Down