From d630d967b671dd7583db61578a5091fdf4db1629 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 14:07:07 +0100 Subject: [PATCH 1/4] Add mypy to notifier --- synapse/notifier.py | 28 +++++++++++++++------------- synapse/server.pyi | 6 ++++++ tox.ini | 1 + 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 22ab4a9da525..80c05f506446 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -15,7 +15,7 @@ import logging from collections import namedtuple -from typing import Callable, Iterable, List, TypeVar +from typing import Callable, Dict, Iterable, List, Set, Tuple, TypeVar from prometheus_client import Counter @@ -24,12 +24,13 @@ import synapse.server from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError +from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import PreserveLoggingContext from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import StreamToken +from synapse.types import Collection, StreamToken from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -159,14 +160,16 @@ class Notifier(object): UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 def __init__(self, hs: "synapse.server.HomeServer"): - self.user_to_user_stream = {} - self.room_to_user_streams = {} + self.user_to_user_stream = {} # type: Dict[str, _NotifierUserStream] + self.room_to_user_streams = {} # type: Dict[str, Set[_NotifierUserStream]] self.hs = hs self.storage = hs.get_storage() self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() - self.pending_new_room_events = [] + self.pending_new_room_events = ( + [] + ) # type: List[Tuple[int, EventBase, Collection[str]]] # Called when there are new things to stream over replication self.replication_callbacks = [] # type: List[Callable[[], None]] @@ -178,10 +181,9 @@ def __init__(self, hs: "synapse.server.HomeServer"): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self.federation_sender = None if hs.should_send_federation(): self.federation_sender = hs.get_federation_sender() - else: - self.federation_sender = None self.state_handler = hs.get_state_handler() @@ -193,12 +195,12 @@ def __init__(self, hs: "synapse.server.HomeServer"): # when rendering the metrics page, which is likely once per minute at # most when scraping it. def count_listeners(): - all_user_streams = set() + all_user_streams = set() # type: Set[_NotifierUserStream] - for x in list(self.room_to_user_streams.values()): - all_user_streams |= x - for x in list(self.user_to_user_stream.values()): - all_user_streams.add(x) + for streams in list(self.room_to_user_streams.values()): + all_user_streams |= streams + for stream in list(self.user_to_user_stream.values()): + all_user_streams.add(stream) return sum(stream.count_listeners() for stream in all_user_streams) @@ -408,7 +410,7 @@ async def check_for_updates(before_token, after_token): if not after_token.is_after(before_token): return EventStreamResult([], (from_token, from_token)) - events = [] + events = [] # type: List[EventBase] end_token = from_token for name, source in self.event_sources.sources.items(): diff --git a/synapse/server.pyi b/synapse/server.pyi index 1aba408c2164..6a9bbb471353 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -31,8 +31,10 @@ import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage from synapse.events.builder import EventBuilderFactory +from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.typing import FollowerTypingHandler from synapse.replication.tcp.streams import Stream +from synapse.streams.events import EventSources class HomeServer(object): @property @@ -153,3 +155,7 @@ class HomeServer(object): pass def get_typing_handler(self) -> FollowerTypingHandler: pass + def get_event_sources(self) -> EventSources: + pass + def get_application_service_handler(self): + return ApplicationServicesHandler(self) diff --git a/tox.ini b/tox.ini index 9a052c1e339a..54f28990cea0 100644 --- a/tox.ini +++ b/tox.ini @@ -198,6 +198,7 @@ commands = mypy \ synapse/logging/ \ synapse/metrics \ synapse/module_api \ + synapse/notifier.py \ synapse/push/pusherpool.py \ synapse/push/push_rule_evaluator.py \ synapse/replication \ From 924993acc0a5b8c64522ecd27e7259c1edb12384 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 14:24:57 +0100 Subject: [PATCH 2/4] Add typing to function signatures --- synapse/handlers/events.py | 4 -- synapse/notifier.py | 107 ++++++++++++++++++++++++------------- 2 files changed, 70 insertions(+), 41 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 71a89f09c765..1924636c4d70 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -57,13 +57,10 @@ async def get_stream( timeout=0, as_client_event=True, affect_presence=True, - only_keys=None, room_id=None, is_guest=False, ): """Fetches the events stream for a given user. - - If `only_keys` is not None, events from keys will be sent down. """ if room_id: @@ -93,7 +90,6 @@ async def get_stream( auth_user, pagin_config, timeout, - only_keys=only_keys, is_guest=is_guest, explicit_room_id=room_id, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 80c05f506446..694efe7116ae 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -15,7 +15,17 @@ import logging from collections import namedtuple -from typing import Callable, Dict, Iterable, List, Set, Tuple, TypeVar +from typing import ( + Awaitable, + Callable, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + TypeVar, +) from prometheus_client import Counter @@ -30,7 +40,8 @@ from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import Collection, StreamToken +from synapse.streams.config import PaginationConfig +from synapse.types import Collection, StreamToken, UserID from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -78,7 +89,13 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user_id, rooms, current_token, time_now_ms): + def __init__( + self, + user_id: str, + rooms: Collection[str], + current_token: StreamToken, + time_now_ms: int, + ): self.user_id = user_id self.rooms = set(rooms) self.current_token = current_token @@ -94,13 +111,13 @@ def __init__(self, user_id, rooms, current_token, time_now_ms): with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) - def notify(self, stream_key, stream_id, time_now_ms): + def notify(self, stream_key: str, stream_id: int, time_now_ms: int): """Notify any listeners for this user of a new event from an event source. Args: - stream_key(str): The stream the event came from. - stream_id(str): The new id for the stream the event came from. - time_now_ms(int): The current time in milliseconds. + stream_key: The stream the event came from. + stream_id: The new id for the stream the event came from. + time_now_ms: The current time in milliseconds. """ self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) self.last_notified_token = self.current_token @@ -113,7 +130,7 @@ def notify(self, stream_key, stream_id, time_now_ms): self.notify_deferred = ObservableDeferred(defer.Deferred()) noify_deferred.callback(self.current_token) - def remove(self, notifier): + def remove(self, notifier: "Notifier"): """ Remove this listener from all the indexes in the Notifier it knows about. """ @@ -124,10 +141,10 @@ def remove(self, notifier): notifier.user_to_user_stream.pop(self.user_id) - def count_listeners(self): + def count_listeners(self) -> int: return len(self.notify_deferred.observers()) - def new_listener(self, token): + def new_listener(self, token: StreamToken) -> _NotificationListener: """Returns a deferred that is resolved when there is a new token greater than the given token. @@ -225,7 +242,11 @@ def add_replication_callback(self, cb: Callable[[], None]): self.replication_callbacks.append(cb) def on_new_room_event( - self, event, room_stream_id, max_room_stream_id, extra_users=[] + self, + event: EventBase, + room_stream_id: int, + max_room_stream_id: int, + extra_users: Collection[str] = [], ): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -243,11 +264,11 @@ def on_new_room_event( self.notify_replication() - def _notify_pending_new_room_events(self, max_room_stream_id): + def _notify_pending_new_room_events(self, max_room_stream_id: int): """Notify for the room events that were queued waiting for a previous event to be persisted. Args: - max_room_stream_id(int): The highest stream_id below which all + max_room_stream_id: The highest stream_id below which all events have been persisted. """ pending = self.pending_new_room_events @@ -260,7 +281,9 @@ def _notify_pending_new_room_events(self, max_room_stream_id): else: self._on_new_room_event(event, room_stream_id, extra_users) - def _on_new_room_event(self, event, room_stream_id, extra_users=[]): + def _on_new_room_event( + self, event: EventBase, room_stream_id: int, extra_users: Collection[str] = [] + ): """Notify any user streams that are interested in this room event""" # poke any interested application service. run_as_background_process( @@ -277,13 +300,19 @@ def _on_new_room_event(self, event, room_stream_id, extra_users=[]): "room_key", room_stream_id, users=extra_users, rooms=[event.room_id] ) - async def _notify_app_services(self, room_stream_id): + async def _notify_app_services(self, room_stream_id: int): try: await self.appservice_handler.notify_interested_services(room_stream_id) except Exception: logger.exception("Error notifying application services of event") - def on_new_event(self, stream_key, new_token, users=[], rooms=[]): + def on_new_event( + self, + stream_key: str, + new_token: int, + users: Collection[str] = [], + rooms: Collection[str] = [], + ): """ Used to inform listeners that something has happened event wise. Will wake up all listeners for the given users and rooms. @@ -309,14 +338,19 @@ def on_new_event(self, stream_key, new_token, users=[], rooms=[]): self.notify_replication() - def on_new_replication_data(self): + def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" self.notify_replication() async def wait_for_events( - self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START - ): + self, + user_id: str, + timeout: int, + callback: Callable[[StreamToken, StreamToken], Awaitable[T]], + room_ids=None, + from_token=StreamToken.START, + ) -> T: """Wait until the callback returns a non empty response or the timeout fires. """ @@ -379,19 +413,16 @@ async def wait_for_events( async def get_events_for( self, - user, - pagination_config, - timeout, - only_keys=None, - is_guest=False, - explicit_room_id=None, - ): + user: UserID, + pagination_config: PaginationConfig, + timeout: int, + is_guest: bool = False, + explicit_room_id: str = None, + ) -> EventStreamResult: """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. - If `only_keys` is not None, events from keys will be sent down. - If explicit_room_id is not set, the user's joined rooms will be polled for events. If explicit_room_id is set, that room will be polled for events only if @@ -406,7 +437,9 @@ async def get_events_for( room_ids, is_joined = await self._get_room_ids(user, explicit_room_id) is_peeking = not is_joined - async def check_for_updates(before_token, after_token): + async def check_for_updates( + before_token: StreamToken, after_token: StreamToken + ) -> EventStreamResult: if not after_token.is_after(before_token): return EventStreamResult([], (from_token, from_token)) @@ -419,8 +452,6 @@ async def check_for_updates(before_token, after_token): after_id = getattr(after_token, keyname) if before_id == after_id: continue - if only_keys and name not in only_keys: - continue new_events, new_key = await source.get_new_events( user=user, @@ -478,7 +509,9 @@ async def check_for_updates(before_token, after_token): return result - async def _get_room_ids(self, user, explicit_room_id): + async def _get_room_ids( + self, user: UserID, explicit_room_id: Optional[str] + ) -> Tuple[Collection[str], bool]: joined_room_ids = await self.store.get_rooms_for_user(user.to_string()) if explicit_room_id: if explicit_room_id in joined_room_ids: @@ -488,7 +521,7 @@ async def _get_room_ids(self, user, explicit_room_id): raise AuthError(403, "Non-joined access not allowed") return joined_room_ids, True - async def _is_world_readable(self, room_id): + async def _is_world_readable(self, room_id: str) -> bool: state = await self.state_handler.get_current_state( room_id, EventTypes.RoomHistoryVisibility, "" ) @@ -498,7 +531,7 @@ async def _is_world_readable(self, room_id): return False @log_function - def remove_expired_streams(self): + def remove_expired_streams(self) -> None: time_now_ms = self.clock.time_msec() expired_streams = [] expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS @@ -512,21 +545,21 @@ def remove_expired_streams(self): expired_stream.remove(self) @log_function - def _register_with_keys(self, user_stream): + def _register_with_keys(self, user_stream: _NotifierUserStream): self.user_to_user_stream[user_stream.user_id] = user_stream for room in user_stream.rooms: s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - def _user_joined_room(self, user_id, room_id): + def _user_joined_room(self, user_id: str, room_id: str): new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) new_user_stream.rooms.add(room_id) - def notify_replication(self): + def notify_replication(self) -> None: """Notify the any replication listeners that there's a new event""" for cb in self.replication_callbacks: cb() From bd0cbef656c782148d9d5ef0848b7fd8a1a4410b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 14:41:21 +0100 Subject: [PATCH 3/4] Newsfile --- changelog.d/8058.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8058.misc diff --git a/changelog.d/8058.misc b/changelog.d/8058.misc new file mode 100644 index 000000000000..dd30d359174f --- /dev/null +++ b/changelog.d/8058.misc @@ -0,0 +1 @@ +Add typing info to `Notifier`. From d986b78ebac9d09b2c5d7f2a3bec0dd7f073806b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Aug 2020 18:19:58 +0100 Subject: [PATCH 4/4] Update changelog.d/8058.misc Co-authored-by: Patrick Cloke --- changelog.d/8058.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/8058.misc b/changelog.d/8058.misc index dd30d359174f..41a27e5d72de 100644 --- a/changelog.d/8058.misc +++ b/changelog.d/8058.misc @@ -1 +1 @@ -Add typing info to `Notifier`. +Add type hints to `Notifier`.