From 78911ca46a90597036367155915f63da5cbb4fdd Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 21 Sep 2020 15:09:31 +0100 Subject: [PATCH 1/7] Appservice API changes --- synapse/appservice/__init__.py | 6 ++++-- synapse/appservice/api.py | 22 ++++++++++++++++++++++ synapse/appservice/scheduler.py | 8 ++++++-- synapse/config/appservice.py | 3 +++ 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 13ec1f71a64c..8737b34e546e 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -91,6 +91,7 @@ def __init__( protocols=None, rate_limited=True, ip_range_whitelist=None, + supports_ephemeral=False, ): self.token = token self.url = ( @@ -102,6 +103,7 @@ def __init__( self.namespaces = self._check_namespaces(namespaces) self.id = id self.ip_range_whitelist = ip_range_whitelist + self.supports_ephemeral = supports_ephemeral if "|" in self.id: raise Exception("application service ID cannot contain '|' character") @@ -188,11 +190,11 @@ async def _matches_user(self, event, store): if not store: return False - does_match = await self._matches_user_in_member_list(event.room_id, store) + does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match @cached(num_args=1, cache_context=True) - async def _matches_user_in_member_list(self, room_id, store, cache_context): + async def matches_user_in_member_list(self, room_id, store, cache_context): member_list = await store.get_users_in_room( room_id, on_invalidate=cache_context.invalidate ) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 1514c0f69142..48982d2ad398 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -201,6 +201,28 @@ async def _get() -> Optional[JsonDict]: key = (service.id, protocol) return await self.protocol_meta_cache.wrap(key, _get) + async def push_ephemeral(self, service, events): + if service.url is None: + return True + if service.supports_ephemeral is False: + return True + + uri = service.url + ( + "%s/uk.half-shot.appservice/ephemeral" % APP_SERVICE_PREFIX + ) + try: + await self.put_json( + uri=uri, + json_body={"events": events}, + args={"access_token": service.hs_token}, + ) + return True + except CodeMessageException as e: + logger.warning("push_ephemeral to %s received %s", uri, e.code) + except Exception as ex: + logger.warning("push_ephemeral to %s threw exception %s", uri, ex) + return False + async def push_bulk(self, service, events, txn_id=None): if service.url is None: return True diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 8eb8c6f51cf9..74bd09567753 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -85,6 +85,10 @@ async def start(self): def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) + async def submit_ephemeral_events_for_as(self, service, events): + if self.txn_ctrl.is_service_up(service): + await self.as_api.push_ephemeral(service, events) + class _ServiceQueuer: """Queue of events waiting to be sent to appservices. @@ -161,7 +165,7 @@ def __init__(self, clock, store, as_api): async def send(self, service, events): try: txn = await self.store.create_appservice_txn(service=service, events=events) - service_is_up = await self._is_service_up(service) + service_is_up = await self.is_service_up(service) if service_is_up: sent = await txn.send(self.as_api) if sent: @@ -204,7 +208,7 @@ def start_recoverer(self, service): recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) - async def _is_service_up(self, service): + async def is_service_up(self, service): state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 8ed3e2425843..013cd0fd96e2 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename): if as_info.get("ip_range_whitelist"): ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist")) + supports_ephemeral = as_info.get("uk.half-shot.appservice.push_ephemeral", False) + return ApplicationService( token=as_info["as_token"], hostname=hostname, @@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename): hs_token=as_info["hs_token"], sender=user_id, id=as_info["id"], + supports_ephemeral=supports_ephemeral, protocols=protocols, rate_limited=rate_limited, ip_range_whitelist=ip_range_whitelist, From ae724db89986938db60d187db1ef1ab92f7e7753 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 21 Sep 2020 15:10:06 +0100 Subject: [PATCH 2/7] Changes to handlers to support fetching events for appservices --- synapse/handlers/appservice.py | 49 ++++++++++++++++++++ synapse/handlers/receipts.py | 22 +++++++++ synapse/handlers/typing.py | 19 ++++++++ synapse/storage/databases/main/receipts.py | 53 ++++++++++++++++++++++ 4 files changed, 143 insertions(+) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9d4e87dad6a6..e8cc166fdeac 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -20,6 +20,20 @@ from twisted.internet import defer import synapse +from typing import ( + Awaitable, + Callable, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + TypeVar, + Union, +) + +from synapse.types import RoomStreamToken from synapse.api.constants import EventTypes from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import ( @@ -43,6 +57,7 @@ def __init__(self, hs): self.started_scheduler = False self.clock = hs.get_clock() self.notify_appservices = hs.config.notify_appservices + self.event_sources = hs.get_event_sources() self.current_max = 0 self.is_processing = False @@ -158,6 +173,40 @@ async def handle_room_events(events): finally: self.is_processing = False + async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken]): + services = [service for service in self.store.get_app_services() if service.supports_ephemeral] + if not services or not self.notify_appservices: + return + logger.info("Checking interested services for %s" % (stream_key)) + with Measure(self.clock, "notify_interested_services_ephemeral"): + for service in services: + events = [] + if stream_key == "typing_key": + from_key = new_token - 1 + typing_source = self.event_sources.sources["typing"] + # Get the typing events from just before current + typing, _typing_key = await typing_source.get_new_events_as( + service=service, + from_key=from_key + ) + events = typing + elif stream_key == "receipt_key": + from_key = new_token - 1 + receipts_source = self.event_sources.sources["receipt"] + receipts, _receipts_key = await receipts_source.get_new_events_as( + service=service, + from_key=from_key + ) + events = receipts + elif stream_key == "presence": + # TODO: This. Presence means trying to determine all the + # users the appservice cares about, which means checking + # all the rooms the appservice is in. + if events: + # TODO: Do in background? + await self.scheduler.submit_ephemeral_events_for_as(service, events) + + async def query_user_exists(self, user_id): """Check if any application service knows this user_id exists. diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 722592375796..d9e4b1c27154 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -140,5 +140,27 @@ async def get_new_events(self, from_key, room_ids, **kwargs): return (events, to_key) + async def get_new_events_as(self, from_key, service, **kwargs): + from_key = int(from_key) + to_key = self.get_current_key() + + if from_key == to_key: + return [], to_key + + # We first need to fetch all new receipts + rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms( + from_key=from_key, to_key=to_key + ) + + # Then filter down to rooms that the AS can read + events = [] + for room_id, event in rooms_to_events.items(): + if not await service.matches_user_in_member_list(room_id, self.store): + continue + + events.append(event) + + return (events, to_key) + def get_current_key(self, direction="f"): return self.store.get_max_receipt_stream_id() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3cbfc2d780a7..1747e4c872ab 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, List, Set, Tuple from synapse.api.errors import AuthError, ShadowBanError, SynapseError +from synapse.appservice import ApplicationService from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.streams import TypingStream from synapse.types import UserID, get_domain_from_id @@ -430,6 +431,24 @@ def _make_event_for(self, room_id): "content": {"user_ids": list(typing)}, } + async def get_new_events_as(self, from_key, service, **kwargs): + with Measure(self.clock, "typing.get_new_events_as"): + from_key = int(from_key) + handler = self.get_typing_handler() + + events = [] + for room_id in handler._room_serials.keys(): + if handler._room_serials[room_id] <= from_key: + print("Key too old") + continue + # XXX: Store gut wrenching + if not await service.matches_user_in_member_list(room_id, handler.store): + continue + + events.append(self._make_event_for(room_id)) + + return (events, handler._latest_room_serial) + async def get_new_events(self, from_key, room_ids, **kwargs): with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index f880b5e562cc..5867d52b62fd 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -123,6 +123,15 @@ def f(txn): for row in rows } + async def get_linearized_receipts_for_all_rooms( + self, to_key: int, from_key: Optional[int] = None + ) -> List[dict]: + results = await self._get_linearized_receipts_for_all_rooms( + to_key, from_key=from_key + ) + + return results + async def get_linearized_receipts_for_rooms( self, room_ids: List[str], to_key: int, from_key: Optional[int] = None ) -> List[dict]: @@ -274,6 +283,50 @@ def f(txn): } return results + @cached( + num_args=2, + ) + async def _get_linearized_receipts_for_all_rooms(self, to_key, from_key=None): + def f(txn): + if from_key: + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id > ? AND stream_id <= ? + """ + txn.execute(sql, [from_key, to_key]) + else: + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id <= ? + """ + + txn.execute(sql, [to_key]) + + return self.db_pool.cursor_to_dict(txn) + + txn_results = await self.db_pool.runInteraction( + "_get_linearized_receipts_for_all_rooms", f + ) + + results = {} + for row in txn_results: + # We want a single event per room, since we want to batch the + # receipts by room, event and type. + room_event = results.setdefault( + row["room_id"], + {"type": "m.receipt", "room_id": row["room_id"], "content": {}}, + ) + + # The content is of the form: + # {"$foo:bar": { "read": { "@user:host": }, .. }, .. } + event_entry = room_event["content"].setdefault(row["event_id"], {}) + receipt_type = event_entry.setdefault(row["receipt_type"], {}) + + receipt_type[row["user_id"]] = db_to_json(row["data"]) + + return results + + async def get_users_sent_receipts_between( self, last_id: int, current_id: int ) -> List[str]: From 42090bcc7cf84ae89e4aaad87872771a5995a652 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 21 Sep 2020 15:10:37 +0100 Subject: [PATCH 3/7] Call appservice handler when seeing new events in the notifier --- synapse/notifier.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/notifier.py b/synapse/notifier.py index a8fd3ef886ce..48008e2c073c 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -326,6 +326,12 @@ async def _notify_app_services(self, max_room_stream_id: int): except Exception: logger.exception("Error notifying application services of event") + async def _notify_app_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken]): + try: + await self.appservice_handler.notify_interested_services_ephemeral(stream_key, new_token) + except Exception: + logger.exception("Error notifying application services of event") + async def _notify_pusher_pool(self, max_room_stream_id: int): try: await self._pusher_pool.on_new_notifications(max_room_stream_id) @@ -364,6 +370,11 @@ def on_new_event( self.notify_replication() + # Notify appservices + run_as_background_process( + "_notify_app_services_ephemeral", self._notify_app_services_ephemeral, stream_key, new_token, + ) + def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" From 3bf1b79d3c2735cca9c0b38689b1f2701354bd52 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 21 Sep 2020 16:21:22 +0100 Subject: [PATCH 4/7] Add is_interested_in_presence func --- synapse/appservice/__init__.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 8737b34e546e..5ed9f54bc7cc 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -19,7 +19,7 @@ from synapse.api.constants import EventTypes from synapse.appservice.api import ApplicationServiceApi from synapse.types import GroupID, get_domain_from_id -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedList if TYPE_CHECKING: from synapse.storage.databases.main import DataStore @@ -241,6 +241,19 @@ async def is_interested(self, event, store=None) -> bool: return False + @cached(num_args=1, cache_context=True) + async def is_interested_in_presence(self, user_id, store, cache_context): + # Find all the rooms the sender is in + if self.is_interested_in_user(user_id.to_string()): + return True + room_ids = await store.get_rooms_for_user(user_id.to_string()) + + # Then find out if the appservice is interested in any of those rooms + for room_id in room_ids: + if await self.matches_user_in_member_list(room_id, store, cache_context): + return True + return False + def is_interested_in_user(self, user_id): return ( self._matches_regex(user_id, ApplicationService.NS_USERS) From 4392526bf0a66c484e9ca1ad5b3177c3649c9ecc Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 21 Sep 2020 16:22:11 +0100 Subject: [PATCH 5/7] Last little bits --- synapse/handlers/appservice.py | 40 +++++++++++++++++++++++++++------- synapse/notifier.py | 6 ++--- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index e8cc166fdeac..9813447903ae 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -31,9 +31,10 @@ Tuple, TypeVar, Union, + Collection, ) -from synapse.types import RoomStreamToken +from synapse.types import RoomStreamToken, UserID from synapse.api.constants import EventTypes from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import ( @@ -42,6 +43,7 @@ ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.metrics import Measure +from synapse.handlers.presence import format_user_presence_state logger = logging.getLogger(__name__) @@ -173,7 +175,7 @@ async def handle_room_events(events): finally: self.is_processing = False - async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken]): + async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []): services = [service for service in self.store.get_app_services() if service.supports_ephemeral] if not services or not self.notify_appservices: return @@ -185,7 +187,7 @@ async def notify_interested_services_ephemeral(self, stream_key: str, new_token: from_key = new_token - 1 typing_source = self.event_sources.sources["typing"] # Get the typing events from just before current - typing, _typing_key = await typing_source.get_new_events_as( + typing, _key = await typing_source.get_new_events_as( service=service, from_key=from_key ) @@ -193,15 +195,37 @@ async def notify_interested_services_ephemeral(self, stream_key: str, new_token: elif stream_key == "receipt_key": from_key = new_token - 1 receipts_source = self.event_sources.sources["receipt"] - receipts, _receipts_key = await receipts_source.get_new_events_as( + receipts, _key = await receipts_source.get_new_events_as( service=service, from_key=from_key ) events = receipts - elif stream_key == "presence": - # TODO: This. Presence means trying to determine all the - # users the appservice cares about, which means checking - # all the rooms the appservice is in. + elif stream_key == "presence_key": + events = [] + presence_source = self.event_sources.sources["presence"] + for user in users: + interested = await service.is_interested_in_presence(user, self.store) + if not interested: + continue + presence_events, _key = await presence_source.get_new_events( + user=user, + service=service, + from_key=None, # I don't think this is required + ) + time_now = self.clock.time_msec() + presence_events = [ + { + "type": "m.presence", + "sender": event.user_id, + "content": format_user_presence_state( + event, time_now, include_user_id=False + ), + } + for event in presence_events + ] + events = events + presence_events + elif stream_key == "to_device_key": + print("to_device_key", users) if events: # TODO: Do in background? await self.scheduler.submit_ephemeral_events_for_as(service, events) diff --git a/synapse/notifier.py b/synapse/notifier.py index 48008e2c073c..37546e6c21bd 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -326,9 +326,9 @@ async def _notify_app_services(self, max_room_stream_id: int): except Exception: logger.exception("Error notifying application services of event") - async def _notify_app_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken]): + async def _notify_app_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []): try: - await self.appservice_handler.notify_interested_services_ephemeral(stream_key, new_token) + await self.appservice_handler.notify_interested_services_ephemeral(stream_key, new_token, users) except Exception: logger.exception("Error notifying application services of event") @@ -372,7 +372,7 @@ def on_new_event( # Notify appservices run_as_background_process( - "_notify_app_services_ephemeral", self._notify_app_services_ephemeral, stream_key, new_token, + "_notify_app_services_ephemeral", self._notify_app_services_ephemeral, stream_key, new_token, users, ) def on_new_replication_data(self) -> None: From 316ad09a64b455efc18823f92619fd9c7d3a6c63 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Tue, 22 Sep 2020 11:31:59 +0100 Subject: [PATCH 6/7] Add support for device messages, start support for device lists --- synapse/appservice/api.py | 4 +- synapse/handlers/appservice.py | 76 +++++++++++++------ synapse/storage/databases/main/appservice.py | 44 ++++++++++- synapse/storage/databases/main/deviceinbox.py | 32 ++++++++ .../schema/delta/58/18as_device_stream.sql | 25 ++++++ 5 files changed, 153 insertions(+), 28 deletions(-) create mode 100644 synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 48982d2ad398..9523c3a5d966 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -201,7 +201,7 @@ async def _get() -> Optional[JsonDict]: key = (service.id, protocol) return await self.protocol_meta_cache.wrap(key, _get) - async def push_ephemeral(self, service, events): + async def push_ephemeral(self, service, events, to_device=None, device_lists=None): if service.url is None: return True if service.supports_ephemeral is False: @@ -213,7 +213,7 @@ async def push_ephemeral(self, service, events): try: await self.put_json( uri=uri, - json_body={"events": events}, + json_body={"events": events, "device_messages": to_device, "device_lists": device_lists}, args={"access_token": service.hs_token}, ) return True diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9813447903ae..dbbde3db1844 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -201,35 +201,61 @@ async def notify_interested_services_ephemeral(self, stream_key: str, new_token: ) events = receipts elif stream_key == "presence_key": - events = [] - presence_source = self.event_sources.sources["presence"] - for user in users: - interested = await service.is_interested_in_presence(user, self.store) - if not interested: - continue - presence_events, _key = await presence_source.get_new_events( - user=user, - service=service, - from_key=None, # I don't think this is required - ) - time_now = self.clock.time_msec() - presence_events = [ - { - "type": "m.presence", - "sender": event.user_id, - "content": format_user_presence_state( - event, time_now, include_user_id=False - ), - } - for event in presence_events - ] - events = events + presence_events + events = await self._handle_as_presence(service, users) + elif stream_key == "device_list_key": + # Check if the device lists have changed for any of the users we are interested in + print("device_list_key", users) elif stream_key == "to_device_key": - print("to_device_key", users) + # Check the inbox for any users the bridge owns + events, to_device_token = await self._handle_to_device(service, users, new_token) + if events: + # TODO: Do in background? + await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token) + if stream_key == "to_device_key": + # Update database with new token + await self.store.set_device_messages_token_for_appservice(service, to_device_token) + return if events: # TODO: Do in background? - await self.scheduler.submit_ephemeral_events_for_as(service, events) + await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token) + + async def _handle_device_list(self, service, users, token): + if not any([True for u in users if service.is_interested_in_user(u)]): + return False + + async def _handle_to_device(self, service, users, token): + if not any([True for u in users if service.is_interested_in_user(u)]): + return False + + since_token = await self.store.get_device_messages_token_for_appservice(service) + messages, new_token = await self.store.get_new_messages_for_as(service, since_token, token) + return messages, new_token + + async def _handle_as_presence(self, service, users): + events = [] + presence_source = self.event_sources.sources["presence"] + for user in users: + interested = await service.is_interested_in_presence(user, self.store) + if not interested: + continue + presence_events, _key = await presence_source.get_new_events( + user=user, + service=service, + from_key=None, # TODO: I don't think this is required? + ) + time_now = self.clock.time_msec() + presence_events = [ + { + "type": "m.presence", + "sender": event.user_id, + "content": format_user_presence_state( + event, time_now, include_user_id=False + ), + } + for event in presence_events + ] + events = events + presence_events async def query_user_exists(self, user_id): """Check if any application service knows this user_id exists. diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 85f6b1e3fdf7..1ebe4504fdf1 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -320,7 +320,7 @@ def set_appservice_last_pos_txn(txn): ) async def get_new_events_for_appservice(self, current_id, limit): - """Get all new evnets""" + """Get all new events for an appservice""" def get_new_events_for_appservice_txn(txn): sql = ( @@ -350,6 +350,48 @@ def get_new_events_for_appservice_txn(txn): events = await self.get_events_as_list(event_ids) return upper_bound, events + + async def get_device_messages_token_for_appservice(self, service): + txn.execute( + "SELECT device_message_stream_id FROM application_services_state WHERE as_id=?", + (service.id,), + ) + last_txn_id = txn.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col + + async def set_device_messages_token_for_appservice(self, service, pos) -> None: + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE application_services_state SET device_message_stream_id = ? WHERE as_id=?", (pos, service.id) + ) + + await self.db_pool.runInteraction( + "set_device_messages_token_for_appservice", set_appservice_last_pos_txn + ) + + async def get_device_list_token_for_appservice(self, service): + txn.execute( + "SELECT device_list_stream_id FROM application_services_state WHERE as_id=?", + (service.id,), + ) + last_txn_id = txn.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col + + async def set_device_list_token_for_appservice(self, service, pos) -> None: + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE application_services_state SET device_list_stream_id = ?", (pos, service.id) + ) + + await self.db_pool.runInteraction( + "set_device_list_token_for_appservice", set_appservice_last_pos_txn + ) class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore): diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index e71217a41f3c..e4fd979a3371 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -21,6 +21,7 @@ from synapse.storage.database import DatabasePool from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache +from synapse.appservice import ApplicationService logger = logging.getLogger(__name__) @@ -29,6 +30,37 @@ class DeviceInboxWorkerStore(SQLBaseStore): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() + async def get_new_messages_for_as( + self, + service: ApplicationService, + last_stream_id: int, + current_stream_id: int, + limit: int = 100, + ) -> Tuple[List[dict], int]: + def get_new_messages_for_device_txn(txn): + sql = ( + "SELECT stream_id, message_json, device_id, user_id FROM device_inbox" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute( + sql, (last_stream_id, current_stream_id, limit) + ) + messages = [] + + for row in txn: + stream_pos = row[0] + if service.is_interested_in_user(row.user_id): + messages.append(db_to_json(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return messages, stream_pos + + return await self.db_pool.runInteraction( + "get_new_messages_for_device", get_new_messages_for_device_txn + ) + async def get_new_messages_for_device( self, user_id: str, diff --git a/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql b/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql new file mode 100644 index 000000000000..d4abfb6183b6 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql @@ -0,0 +1,25 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /* for some reason, we have accumulated duplicate entries in + * device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less + * efficient. + */ + +ALTER TABLE application_services_state + ADD COLUMN device_list_stream_id INT; + +ALTER TABLE application_services_state + ADD COLUMN device_message_stream_id INT; \ No newline at end of file From f807c7291f72db6894747258e439ad662a1bf56e Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Wed, 30 Sep 2020 12:55:57 +0100 Subject: [PATCH 7/7] Add basic support for device list updates --- synapse/appservice/__init__.py | 2 +- synapse/appservice/api.py | 6 +- synapse/handlers/appservice.py | 141 +++++++++++------- synapse/handlers/typing.py | 9 +- synapse/notifier.py | 17 ++- synapse/storage/databases/main/appservice.py | 51 +++---- synapse/storage/databases/main/deviceinbox.py | 13 +- synapse/storage/databases/main/devices.py | 26 ++++ synapse/storage/databases/main/receipts.py | 5 +- 9 files changed, 167 insertions(+), 103 deletions(-) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 5ed9f54bc7cc..a93c08ca8155 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -19,7 +19,7 @@ from synapse.api.constants import EventTypes from synapse.appservice.api import ApplicationServiceApi from synapse.types import GroupID, get_domain_from_id -from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.descriptors import cached if TYPE_CHECKING: from synapse.storage.databases.main import DataStore diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 2c1dae59842a..364c1a88f330 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -213,7 +213,11 @@ async def push_ephemeral(self, service, events, to_device=None, device_lists=Non try: await self.put_json( uri=uri, - json_body={"events": events, "device_messages": to_device, "device_lists": device_lists}, + json_body={ + "events": events, + "device_messages": to_device, + "device_lists": device_lists, + }, args={"access_token": service.hs_token}, ) return True diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index dbbde3db1844..6abc2891cfd5 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -14,36 +14,24 @@ # limitations under the License. import logging +from typing import Collection, List, Union from prometheus_client import Counter from twisted.internet import defer import synapse -from typing import ( - Awaitable, - Callable, - Dict, - Iterable, - List, - Optional, - Set, - Tuple, - TypeVar, - Union, - Collection, -) - -from synapse.types import RoomStreamToken, UserID from synapse.api.constants import EventTypes +from synapse.appservice import ApplicationService +from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import ( event_processing_loop_counter, event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken, UserID from synapse.util.metrics import Measure -from synapse.handlers.presence import format_user_presence_state logger = logging.getLogger(__name__) @@ -175,8 +163,17 @@ async def handle_room_events(events): finally: self.is_processing = False - async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []): - services = [service for service in self.store.get_app_services() if service.supports_ephemeral] + async def notify_interested_services_ephemeral( + self, + stream_key: str, + new_token: Union[int, RoomStreamToken], + users: Collection[UserID] = [], + ): + services = [ + service + for service in self.store.get_app_services() + if service.supports_ephemeral + ] if not services or not self.notify_appservices: return logger.info("Checking interested services for %s" % (stream_key)) @@ -184,65 +181,99 @@ async def notify_interested_services_ephemeral(self, stream_key: str, new_token: for service in services: events = [] if stream_key == "typing_key": - from_key = new_token - 1 - typing_source = self.event_sources.sources["typing"] - # Get the typing events from just before current - typing, _key = await typing_source.get_new_events_as( - service=service, - from_key=from_key - ) - events = typing + events = await self._handle_typing(service, new_token) elif stream_key == "receipt_key": - from_key = new_token - 1 - receipts_source = self.event_sources.sources["receipt"] - receipts, _key = await receipts_source.get_new_events_as( - service=service, - from_key=from_key - ) - events = receipts + events = await self._handle_receipts(service) elif stream_key == "presence_key": events = await self._handle_as_presence(service, users) elif stream_key == "device_list_key": # Check if the device lists have changed for any of the users we are interested in - print("device_list_key", users) + events = await self._handle_device_list(service, users, new_token) elif stream_key == "to_device_key": - # Check the inbox for any users the bridge owns - events, to_device_token = await self._handle_to_device(service, users, new_token) - if events: - # TODO: Do in background? - await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token) - if stream_key == "to_device_key": - # Update database with new token - await self.store.set_device_messages_token_for_appservice(service, to_device_token) - return + # Check the inbox for any users the bridge owns + events = await self._handle_to_device(service, users, new_token) if events: # TODO: Do in background? - await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token) + await self.scheduler.submit_ephemeral_events_for_as( + service, events, new_token + ) + # We don't persist the token for typing_key + if stream_key == "presence_key": + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) + elif stream_key == "receipt_key": + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) + elif stream_key == "to_device_key": + await self.store.set_type_stream_id_for_appservice( + service, "to_device", new_token + ) - async def _handle_device_list(self, service, users, token): - if not any([True for u in users if service.is_interested_in_user(u)]): - return False + async def _handle_typing(self, service, new_token): + typing_source = self.event_sources.sources["typing"] + # Get the typing events from just before current + typing, _key = await typing_source.get_new_events_as( + service=service, + # For performance reasons, we don't persist the previous + # token in the DB and instead fetch the latest typing information + # for appservices. + from_key=new_token - 1, + ) + return typing + + async def _handle_receipts(self, service, token: int): + from_key = await self.store.get_type_stream_id_for_appservice( + service, "read_receipt" + ) + receipts_source = self.event_sources.sources["receipt"] + receipts, _ = await receipts_source.get_new_events_as( + service=service, from_key=from_key + ) + return receipts + + async def _handle_device_list( + self, service: ApplicationService, users: List[str], new_token: int + ): + # TODO: Determine if any user have left and report those + from_token = await self.store.get_type_stream_id_for_appservice( + service, "device_list" + ) + changed_user_ids = await self.store.get_device_changes_for_as( + service, from_token, new_token + ) + # Return the + return { + "type": "m.device_list_update", + "content": {"changed": changed_user_ids,}, + } async def _handle_to_device(self, service, users, token): if not any([True for u in users if service.is_interested_in_user(u)]): return False - - since_token = await self.store.get_device_messages_token_for_appservice(service) - - messages, new_token = await self.store.get_new_messages_for_as(service, since_token, token) - return messages, new_token + + since_token = await self.store.get_type_stream_id_for_appservice( + service, "to_device" + ) + messages, _ = await self.store.get_new_messages_for_as( + service, since_token, token + ) + # This returns user_id -> device_id -> message + return messages async def _handle_as_presence(self, service, users): events = [] presence_source = self.event_sources.sources["presence"] + from_key = await self.store.get_type_stream_id_for_appservice( + service, "presence" + ) for user in users: interested = await service.is_interested_in_presence(user, self.store) if not interested: continue presence_events, _key = await presence_source.get_new_events( - user=user, - service=service, - from_key=None, # TODO: I don't think this is required? + user=user, service=service, from_key=from_key, ) time_now = self.clock.time_msec() presence_events = [ diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 1747e4c872ab..8a8f480777ad 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -431,7 +431,9 @@ def _make_event_for(self, room_id): "content": {"user_ids": list(typing)}, } - async def get_new_events_as(self, from_key, service, **kwargs): + async def get_new_events_as( + self, from_key: int, service: ApplicationService, **kwargs + ): with Measure(self.clock, "typing.get_new_events_as"): from_key = int(from_key) handler = self.get_typing_handler() @@ -441,8 +443,9 @@ async def get_new_events_as(self, from_key, service, **kwargs): if handler._room_serials[room_id] <= from_key: print("Key too old") continue - # XXX: Store gut wrenching - if not await service.matches_user_in_member_list(room_id, handler.store): + if not await service.matches_user_in_member_list( + room_id, handler.store + ): continue events.append(self._make_event_for(room_id)) diff --git a/synapse/notifier.py b/synapse/notifier.py index b6b231c15d72..7396fe96c577 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -329,9 +329,16 @@ async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): except Exception: logger.exception("Error notifying application services of event") - async def _notify_app_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []): + async def _notify_app_services_ephemeral( + self, + stream_key: str, + new_token: Union[int, RoomStreamToken], + users: Collection[UserID] = [], + ): try: - await self.appservice_handler.notify_interested_services_ephemeral(stream_key, new_token, users) + await self.appservice_handler.notify_interested_services_ephemeral( + stream_key, new_token, users + ) except Exception: logger.exception("Error notifying application services of event") @@ -375,7 +382,11 @@ def on_new_event( # Notify appservices run_as_background_process( - "_notify_app_services_ephemeral", self._notify_app_services_ephemeral, stream_key, new_token, users, + "_notify_app_services_ephemeral", + self._notify_app_services_ephemeral, + stream_key, + new_token, + users, ) def on_new_replication_data(self) -> None: diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 1ebe4504fdf1..91c0b52b34b6 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -350,47 +350,36 @@ def get_new_events_for_appservice_txn(txn): events = await self.get_events_as_list(event_ids) return upper_bound, events - - async def get_device_messages_token_for_appservice(self, service): - txn.execute( - "SELECT device_message_stream_id FROM application_services_state WHERE as_id=?", - (service.id,), - ) - last_txn_id = txn.fetchone() - if last_txn_id is None or last_txn_id[0] is None: # no row exists - return 0 - else: - return int(last_txn_id[0]) # select 'last_txn' col - async def set_device_messages_token_for_appservice(self, service, pos) -> None: - def set_appservice_last_pos_txn(txn): + async def get_type_stream_id_for_appservice(self, service, type: str) -> int: + def get_type_stream_id_for_appservice_txn(txn): + stream_id_type = "%s_stream_id" % type txn.execute( - "UPDATE application_services_state SET device_message_stream_id = ? WHERE as_id=?", (pos, service.id) + "SELECT ? FROM application_services_state WHERE as_id=?", + (stream_id_type, service.id,), ) + last_txn_id = txn.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) - await self.db_pool.runInteraction( - "set_device_messages_token_for_appservice", set_appservice_last_pos_txn - ) - - async def get_device_list_token_for_appservice(self, service): - txn.execute( - "SELECT device_list_stream_id FROM application_services_state WHERE as_id=?", - (service.id,), + return await self.db_pool.runInteraction( + "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn ) - last_txn_id = txn.fetchone() - if last_txn_id is None or last_txn_id[0] is None: # no row exists - return 0 - else: - return int(last_txn_id[0]) # select 'last_txn' col - async def set_device_list_token_for_appservice(self, service, pos) -> None: - def set_appservice_last_pos_txn(txn): + async def set_type_stream_id_for_appservice( + self, service, type: str, pos: int + ) -> None: + def set_type_stream_id_for_appservice_txn(txn): + stream_id_type = "%s_stream_id" % type txn.execute( - "UPDATE application_services_state SET device_list_stream_id = ?", (pos, service.id) + "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?", + (stream_id_type, pos, service.id), ) await self.db_pool.runInteraction( - "set_device_list_token_for_appservice", set_appservice_last_pos_txn + "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn ) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 2d151b913488..8897e27b1fe7 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -16,12 +16,12 @@ import logging from typing import List, Tuple +from synapse.appservice import ApplicationService from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache -from synapse.appservice import ApplicationService logger = logging.getLogger(__name__) @@ -44,15 +44,18 @@ def get_new_messages_for_device_txn(txn): " ORDER BY stream_id ASC" " LIMIT ?" ) - txn.execute( - sql, (last_stream_id, current_stream_id, limit) - ) + txn.execute(sql, (last_stream_id, current_stream_id, limit)) messages = [] for row in txn: stream_pos = row[0] if service.is_interested_in_user(row.user_id): - messages.append(db_to_json(row[1])) + msg = db_to_json(row[1]) + msg.recipient = { + "device_id": row.device_id, + "user_id": row.user_id, + } + messages.append(msg) if len(messages) < limit: stream_pos = current_stream_id return messages, stream_pos diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index fdf394c61205..bf32cc6c0654 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -19,6 +19,7 @@ from typing import Any, Dict, Iterable, List, Optional, Set, Tuple from synapse.api.errors import Codes, StoreError +from synapse.appservice import ApplicationService from synapse.logging.opentracing import ( get_active_span_text_map, set_tag, @@ -525,6 +526,31 @@ def _get_users_whose_devices_changed_txn(txn): "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn ) + async def get_device_changes_for_as( + self, + service: ApplicationService, + last_stream_id: int, + current_stream_id: int, + limit: int = 100, + ) -> Tuple[List[dict], int]: + def get_device_changes_for_as_txn(txn): + sql = ( + "SELECT DISTINCT user_ids FROM device_lists_stream" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, (last_stream_id, current_stream_id, limit)) + rows = txn.fetchall() + users = [] + for user in db_to_json(rows[0]): + if await service.is_interested_in_presence(user): + users.append(user) + + return await self.db_pool.runInteraction( + "get_device_changes_for_as", get_device_changes_for_as_txn + ) + async def get_users_whose_signatures_changed( self, user_id: str, from_key: int ) -> Set[str]: diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index c10a16ffa3c7..d26c315ed4ea 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -283,9 +283,7 @@ def f(txn): } return results - @cached( - num_args=2, - ) + @cached(num_args=2,) async def _get_linearized_receipts_for_all_rooms(self, to_key, from_key=None): def f(txn): if from_key: @@ -326,7 +324,6 @@ def f(txn): return results - async def get_users_sent_receipts_between( self, last_id: int, current_id: int ) -> List[str]: