Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

WIP: Send ephemeral events to appservices #8366

Closed
wants to merge 8 commits into from
Closed
19 changes: 17 additions & 2 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def __init__(
protocols=None,
rate_limited=True,
ip_range_whitelist=None,
supports_ephemeral=False,
):
self.token = token
self.url = (
Expand All @@ -102,6 +103,7 @@ def __init__(
self.namespaces = self._check_namespaces(namespaces)
self.id = id
self.ip_range_whitelist = ip_range_whitelist
self.supports_ephemeral = supports_ephemeral

if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")
Expand Down Expand Up @@ -188,11 +190,11 @@ async def _matches_user(self, event, store):
if not store:
return False

does_match = await self._matches_user_in_member_list(event.room_id, store)
does_match = await self.matches_user_in_member_list(event.room_id, store)
return does_match

@cached(num_args=1, cache_context=True)
async def _matches_user_in_member_list(self, room_id, store, cache_context):
async def matches_user_in_member_list(self, room_id, store, cache_context):
member_list = await store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)
Expand Down Expand Up @@ -239,6 +241,19 @@ async def is_interested(self, event, store=None) -> bool:

return False

@cached(num_args=1, cache_context=True)
async def is_interested_in_presence(self, user_id, store, cache_context):
# Find all the rooms the sender is in
if self.is_interested_in_user(user_id.to_string()):
return True
room_ids = await store.get_rooms_for_user(user_id.to_string())

# Then find out if the appservice is interested in any of those rooms
for room_id in room_ids:
if await self.matches_user_in_member_list(room_id, store, cache_context):
return True
return False

def is_interested_in_user(self, user_id):
return (
self._matches_regex(user_id, ApplicationService.NS_USERS)
Expand Down
26 changes: 26 additions & 0 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ async def _get() -> Optional[JsonDict]:
key = (service.id, protocol)
return await self.protocol_meta_cache.wrap(key, _get)

async def push_ephemeral(self, service, events, to_device=None, device_lists=None):
if service.url is None:
return True
if service.supports_ephemeral is False:
return True

uri = service.url + (
"%s/uk.half-shot.appservice/ephemeral" % APP_SERVICE_PREFIX
)
try:
await self.put_json(
uri=uri,
json_body={
"events": events,
"device_messages": to_device,
"device_lists": device_lists,
},
args={"access_token": service.hs_token},
)
return True
except CodeMessageException as e:
logger.warning("push_ephemeral to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_ephemeral to %s threw exception %s", uri, ex)
return False

async def push_bulk(self, service, events, txn_id=None):
if service.url is None:
return True
Expand Down
8 changes: 6 additions & 2 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ async def start(self):
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)

async def submit_ephemeral_events_for_as(self, service, events):
if self.txn_ctrl.is_service_up(service):
await self.as_api.push_ephemeral(service, events)


class _ServiceQueuer:
"""Queue of events waiting to be sent to appservices.
Expand Down Expand Up @@ -161,7 +165,7 @@ def __init__(self, clock, store, as_api):
async def send(self, service, events):
try:
txn = await self.store.create_appservice_txn(service=service, events=events)
service_is_up = await self._is_service_up(service)
service_is_up = await self.is_service_up(service)
if service_is_up:
sent = await txn.send(self.as_api)
if sent:
Expand Down Expand Up @@ -204,7 +208,7 @@ def start_recoverer(self, service):
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))

async def _is_service_up(self, service):
async def is_service_up(self, service):
state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None

Expand Down
3 changes: 3 additions & 0 deletions synapse/config/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename):
if as_info.get("ip_range_whitelist"):
ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist"))

supports_ephemeral = as_info.get("uk.half-shot.appservice.push_ephemeral", False)

return ApplicationService(
token=as_info["as_token"],
hostname=hostname,
Expand All @@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename):
hs_token=as_info["hs_token"],
sender=user_id,
id=as_info["id"],
supports_ephemeral=supports_ephemeral,
protocols=protocols,
rate_limited=rate_limited,
ip_range_whitelist=ip_range_whitelist,
Expand Down
130 changes: 130 additions & 0 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@
# limitations under the License.

import logging
from typing import Collection, List, Union

from prometheus_client import Counter

from twisted.internet import defer

import synapse
from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken, UserID
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
Expand All @@ -43,6 +47,7 @@ def __init__(self, hs):
self.started_scheduler = False
self.clock = hs.get_clock()
self.notify_appservices = hs.config.notify_appservices
self.event_sources = hs.get_event_sources()

self.current_max = 0
self.is_processing = False
Expand Down Expand Up @@ -158,6 +163,131 @@ async def handle_room_events(events):
finally:
self.is_processing = False

async def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
):
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services or not self.notify_appservices:
return
logger.info("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
events = []
if stream_key == "typing_key":
events = await self._handle_typing(service, new_token)
elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
elif stream_key == "presence_key":
events = await self._handle_as_presence(service, users)
elif stream_key == "device_list_key":
# Check if the device lists have changed for any of the users we are interested in
events = await self._handle_device_list(service, users, new_token)
elif stream_key == "to_device_key":
# Check the inbox for any users the bridge owns
events = await self._handle_to_device(service, users, new_token)
if events:
# TODO: Do in background?
await self.scheduler.submit_ephemeral_events_for_as(
service, events, new_token
)
# We don't persist the token for typing_key
if stream_key == "presence_key":
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
elif stream_key == "receipt_key":
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
elif stream_key == "to_device_key":
await self.store.set_type_stream_id_for_appservice(
service, "to_device", new_token
)

async def _handle_typing(self, service, new_token):
typing_source = self.event_sources.sources["typing"]
# Get the typing events from just before current
typing, _key = await typing_source.get_new_events_as(
service=service,
# For performance reasons, we don't persist the previous
# token in the DB and instead fetch the latest typing information
# for appservices.
from_key=new_token - 1,
)
return typing

async def _handle_receipts(self, service, token: int):
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
receipts_source = self.event_sources.sources["receipt"]
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
return receipts

async def _handle_device_list(
self, service: ApplicationService, users: List[str], new_token: int
):
# TODO: Determine if any user have left and report those
from_token = await self.store.get_type_stream_id_for_appservice(
service, "device_list"
)
changed_user_ids = await self.store.get_device_changes_for_as(
service, from_token, new_token
)
# Return the
return {
"type": "m.device_list_update",
"content": {"changed": changed_user_ids,},
}

async def _handle_to_device(self, service, users, token):
if not any([True for u in users if service.is_interested_in_user(u)]):
return False

since_token = await self.store.get_type_stream_id_for_appservice(
service, "to_device"
)
messages, _ = await self.store.get_new_messages_for_as(
service, since_token, token
)
# This returns user_id -> device_id -> message
return messages

async def _handle_as_presence(self, service, users):
events = []
presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
for user in users:
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
presence_events, _key = await presence_source.get_new_events(
user=user, service=service, from_key=from_key,
)
time_now = self.clock.time_msec()
presence_events = [
{
"type": "m.presence",
"sender": event.user_id,
"content": format_user_presence_state(
event, time_now, include_user_id=False
),
}
for event in presence_events
]
events = events + presence_events

async def query_user_exists(self, user_id):
"""Check if any application service knows this user_id exists.

Expand Down
22 changes: 22 additions & 0 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,27 @@ async def get_new_events(self, from_key, room_ids, **kwargs):

return (events, to_key)

async def get_new_events_as(self, from_key, service, **kwargs):
from_key = int(from_key)
to_key = self.get_current_key()

if from_key == to_key:
return [], to_key

# We first need to fetch all new receipts
rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
from_key=from_key, to_key=to_key
)

# Then filter down to rooms that the AS can read
events = []
for room_id, event in rooms_to_events.items():
if not await service.matches_user_in_member_list(room_id, self.store):
continue

events.append(event)

return (events, to_key)

def get_current_key(self, direction="f"):
return self.store.get_max_receipt_stream_id()
22 changes: 22 additions & 0 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import TYPE_CHECKING, List, Set, Tuple

from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
from synapse.types import UserID, get_domain_from_id
Expand Down Expand Up @@ -430,6 +431,27 @@ def _make_event_for(self, room_id):
"content": {"user_ids": list(typing)},
}

async def get_new_events_as(
self, from_key: int, service: ApplicationService, **kwargs
):
with Measure(self.clock, "typing.get_new_events_as"):
from_key = int(from_key)
handler = self.get_typing_handler()

events = []
for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key:
print("Key too old")
continue
if not await service.matches_user_in_member_list(
room_id, handler.store
):
continue

events.append(self._make_event_for(room_id))

return (events, handler._latest_room_serial)

async def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
Expand Down
22 changes: 22 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,19 @@ async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
):
try:
await self.appservice_handler.notify_interested_services_ephemeral(
stream_key, new_token, users
)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
Expand Down Expand Up @@ -367,6 +380,15 @@ def on_new_event(

self.notify_replication()

# Notify appservices
run_as_background_process(
"_notify_app_services_ephemeral",
self._notify_app_services_ephemeral,
stream_key,
new_token,
users,
)

def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""
Expand Down
Loading