Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Send to-device messages to application services #11215

Merged
merged 49 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
7fbfedb
Add experimental config option to send to-device messages to AS's
anoadragon453 Nov 4, 2021
b7a44d4
Add a new ephemeral AS handler for to_device message edus
anoadragon453 Nov 5, 2021
78bd5ea
Allow setting/getting stream id per appservice for to-device messages
anoadragon453 Nov 5, 2021
7899f82
Add database method to fetch to-device messages by user_ids from db
anoadragon453 Nov 5, 2021
103f410
Add a to_device_stream_id column to the application_services_state table
anoadragon453 Nov 5, 2021
e914f1d
Add tests
anoadragon453 Nov 12, 2021
2930fe6
Changelog
anoadragon453 Nov 5, 2021
f65846b
Make msc2409_to_device_messages_enabled private; remove unnecessary c…
anoadragon453 Nov 19, 2021
ce020c3
Move stream filter back into AppserviceHandler
anoadragon453 Nov 19, 2021
8f1183c
Broaden type hints; update comment
anoadragon453 Nov 19, 2021
401cb2b
Deduplicate ephemeral events to send conditional
anoadragon453 Nov 19, 2021
179dd5a
_handle_to_device -> _get_to_device_messages
anoadragon453 Nov 19, 2021
8b0bbc1
Rename ApplicationServiceEphemeralEventsTestCase
anoadragon453 Nov 19, 2021
31c4b40
Rename user1, user2 in tests to something more useful
anoadragon453 Nov 19, 2021
bd9d963
Simplify registration of appservices in tests
anoadragon453 Nov 19, 2021
8f8226a
Fix existing unit tests
anoadragon453 Nov 22, 2021
b4a4b45
rename set_type_stream_id_for_appservice -> set_appservice_stream_typ…
anoadragon453 Nov 24, 2021
c691ef0
Add some FIXME comments
anoadragon453 Nov 24, 2021
7cf6ad9
Add comment on why we don't NOT NULL to_device_stream_id
anoadragon453 Nov 24, 2021
6d68b8a
Refactor and generalise the sending of arbitrary fields over AS trans…
anoadragon453 Dec 3, 2021
13b25cf
Fix tests to mock _TransactionController.send of ApplicationServiceSc…
anoadragon453 Dec 3, 2021
275e1e0
Add to-device messages as their own special section in AS txns
anoadragon453 Dec 3, 2021
403490d
Insert to-device messages into the new to-device part of AS txns
anoadragon453 Dec 3, 2021
385b3bf
Modify tests to handle new location of to-device messages in AS txns
anoadragon453 Dec 3, 2021
c0b157d
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Dec 7, 2021
ba91438
Fix calls to create_appservice_txn in tests
anoadragon453 Dec 7, 2021
0685021
Update synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_…
anoadragon453 Jan 11, 2022
e7f6732
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Jan 11, 2022
3d1661f
rename recipient_user_id_device_id_to_messages -> recipient_device_to…
anoadragon453 Jan 11, 2022
0ac079b
lint
anoadragon453 Jan 11, 2022
822e92a
Refactor storage methods to retrieve to-device messages
anoadragon453 Jan 25, 2022
25488fa
Update old references to get_new_messages_for_device
anoadragon453 Jan 25, 2022
026cb8a
Don't query for to-device messages without a device
anoadragon453 Jan 26, 2022
1121674
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Jan 26, 2022
8129657
Fix shape of simple_insert_many argument
anoadragon453 Jan 26, 2022
de48ab4
Only import MemoryReactor if type-checking in test_scheduler; fix old…
anoadragon453 Jan 26, 2022
d8b8f74
Update _txn function to match outer method name
anoadragon453 Jan 26, 2022
ced1314
Clarify users arg in _get_to_device_messages docstring
anoadragon453 Jan 28, 2022
c749fcb
assert stream_id returned by get_device_messages is as expected
anoadragon453 Jan 28, 2022
24512fb
Fix get_messages_for_device return type documentation
anoadragon453 Jan 28, 2022
24bc3c5
Clean up limit checking
anoadragon453 Jan 28, 2022
30b74a5
Move DB migration to schema v68
anoadragon453 Jan 28, 2022
80c3721
Accept iterables in enqueue_for_appservice
anoadragon453 Jan 28, 2022
3d8e50d
wording fixes
anoadragon453 Jan 28, 2022
50ebcb9
Apply suggestions from code review
anoadragon453 Jan 28, 2022
089e041
Only allow querying either a single device ID, or all device IDs of u…
anoadragon453 Jan 31, 2022
d06781e
Apply suggestions from code review
anoadragon453 Feb 1, 2022
c334eef
Require a user id/device id pair if passing 'limit'
anoadragon453 Feb 1, 2022
bf93ec4
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Feb 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11215.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for sending to-device messages to application services, as specified by [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409). Disabled by default.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we note the experimental configuration flag to enable this?

3 changes: 3 additions & 0 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,13 @@ def __init__(
id: int,
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
self.to_device_messages = to_device_messages

async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
Expand All @@ -369,6 +371,7 @@ async def send(self, as_api: "ApplicationServiceApi") -> bool:
service=self.service,
events=self.events,
ephemeral=self.ephemeral,
to_device_messages=self.to_device_messages,
txn_id=self.id,
)

Expand Down
29 changes: 23 additions & 6 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,23 @@ async def push_bulk(
service: "ApplicationService",
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
txn_id: Optional[int] = None,
) -> bool:
"""
Push data to an application service.

Args:
service: The application service to send to.
events: The persistent events to send.
ephemeral: The ephemeral events to send.
to_device_messages: The to-device messages to send.
txn_id: An unique ID to assign to this transaction. Application services should
deduplicate transactions received with identitical IDs.

Returns:
True if the task succeeded, False if it failed.
"""
if service.url is None:
return True

Expand All @@ -237,13 +252,15 @@ async def push_bulk(
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))

# Never send ephemeral events to appservices that do not support it
body: Dict[str, List[JsonDict]] = {"events": serialized_events}
if service.supports_ephemeral:
body = {
"events": serialized_events,
"de.sorunome.msc2409.ephemeral": ephemeral,
}
else:
body = {"events": serialized_events}
body.update(
{
# TODO: Update to stable prefixes once MSC2409 completes FCP merge.
"de.sorunome.msc2409.ephemeral": ephemeral,
"de.sorunome.msc2409.to_device": to_device_messages,
}
)

try:
await self.put_json(
Expand Down
97 changes: 75 additions & 22 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@
components.
"""
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
Collection,
Dict,
List,
Optional,
Set,
)

from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.appservice.api import ApplicationServiceApi
Expand All @@ -71,6 +80,9 @@
# Maximum number of ephemeral events to provide in an AS transaction.
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100

# Maximum number of to-device messages to provide in an AS transaction.
MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100


class ApplicationServiceScheduler:
"""Public facing API for this module. Does the required DI to tie the
Expand All @@ -97,15 +109,40 @@ async def start(self) -> None:
for service in services:
self.txn_ctrl.start_recoverer(service)

def submit_event_for_as(
self, service: ApplicationService, event: EventBase
def enqueue_for_appservice(
self,
appservice: ApplicationService,
events: Optional[Collection[EventBase]] = None,
ephemeral: Optional[Collection[JsonDict]] = None,
to_device_messages: Optional[Collection[JsonDict]] = None,
) -> None:
self.queuer.enqueue_event(service, event)
"""
Enqueue some data to be sent off to an application service.

def submit_ephemeral_events_for_as(
self, service: ApplicationService, events: List[JsonDict]
) -> None:
self.queuer.enqueue_ephemeral(service, events)
Args:
appservice: The application service to create and send a transaction to.
events: The persistent room events to send.
ephemeral: The ephemeral events to send.
to_device_messages: The to-device messages to send. These differ from normal
to-device messages sent to clients, as they have 'to_device_id' and
'to_user_id' fields.
"""
# We purposefully allow this method to run with empty events/ephemeral
# collections, so that callers do not need to check iterable size themselves.
if not events and not ephemeral and not to_device_messages:
return

if events:
self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
if ephemeral:
self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
if to_device_messages:
self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
to_device_messages
)

# Kick off a new application service transaction
self.queuer.start_background_request(appservice)


class _ServiceQueuer:
Expand All @@ -121,13 +158,15 @@ def __init__(self, txn_ctrl: "_TransactionController", clock: Clock):
self.queued_events: Dict[str, List[EventBase]] = {}
# dict of {service_id: [events]}
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
# dict of {service_id: [to_device_message_json]}
self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}

# the appservices which currently have a transaction in flight
self.requests_in_flight: Set[str] = set()
self.txn_ctrl = txn_ctrl
self.clock = clock

def _start_background_request(self, service: ApplicationService) -> None:
def start_background_request(self, service: ApplicationService) -> None:
# start a sender for this appservice if we don't already have one
if service.id in self.requests_in_flight:
return
Expand All @@ -136,16 +175,6 @@ def _start_background_request(self, service: ApplicationService) -> None:
"as-sender-%s" % (service.id,), self._send_request, service
)

def enqueue_event(self, service: ApplicationService, event: EventBase) -> None:
self.queued_events.setdefault(service.id, []).append(event)
self._start_background_request(service)

def enqueue_ephemeral(
self, service: ApplicationService, events: List[JsonDict]
) -> None:
self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)

async def _send_request(self, service: ApplicationService) -> None:
# sanity-check: we shouldn't get here if this service already has a sender
# running.
Expand All @@ -162,11 +191,21 @@ async def _send_request(self, service: ApplicationService) -> None:
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]

if not events and not ephemeral:
all_to_device_messages = self.queued_to_device_messages.get(
service.id, []
)
to_device_messages_to_send = all_to_device_messages[
:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
]
del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]

if not events and not ephemeral and not to_device_messages_to_send:
return

try:
await self.txn_ctrl.send(service, events, ephemeral)
await self.txn_ctrl.send(
service, events, ephemeral, to_device_messages_to_send
)
except Exception:
logger.exception("AS request failed")
finally:
Expand Down Expand Up @@ -198,10 +237,24 @@ async def send(
service: ApplicationService,
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
to_device_messages: Optional[List[JsonDict]] = None,
) -> None:
"""
Create a transaction with the given data and send to the provided
application service.

Args:
service: The application service to send the transaction to.
events: The persistent events to include in the transaction.
ephemeral: The ephemeral events to include in the transaction.
to_device_messages: The to-device messages to include in the transaction.
"""
try:
txn = await self.store.create_appservice_txn(
service=service, events=events, ephemeral=ephemeral or []
service=service,
events=events,
ephemeral=ephemeral or [],
to_device_messages=to_device_messages or [],
)
service_is_up = await self._is_service_up(service)
if service_is_up:
Expand Down
7 changes: 7 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ def read_config(self, config: JsonDict, **kwargs):
self.msc3202_device_masquerading_enabled: bool = experimental.get(
"msc3202_device_masquerading", False
)

# MSC2409 (this setting only relates to optionally sending to-device messages).
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
# Presence, typing and read receipt EDUs are already sent to application services that
# have opted in to receive them. If enabled, this adds to-device messages to that list.
self.msc2409_to_device_messages_enabled: bool = experimental.get(
"msc2409_to_device_messages_enabled", False
)
Loading