Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Send to-device messages to application services #11215

Merged
merged 49 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
7fbfedb
Add experimental config option to send to-device messages to AS's
anoadragon453 Nov 4, 2021
b7a44d4
Add a new ephemeral AS handler for to_device message edus
anoadragon453 Nov 5, 2021
78bd5ea
Allow setting/getting stream id per appservice for to-device messages
anoadragon453 Nov 5, 2021
7899f82
Add database method to fetch to-device messages by user_ids from db
anoadragon453 Nov 5, 2021
103f410
Add a to_device_stream_id column to the application_services_state table
anoadragon453 Nov 5, 2021
e914f1d
Add tests
anoadragon453 Nov 12, 2021
2930fe6
Changelog
anoadragon453 Nov 5, 2021
f65846b
Make msc2409_to_device_messages_enabled private; remove unnecessary c…
anoadragon453 Nov 19, 2021
ce020c3
Move stream filter back into AppserviceHandler
anoadragon453 Nov 19, 2021
8f1183c
Broaden type hints; update comment
anoadragon453 Nov 19, 2021
401cb2b
Deduplicate ephemeral events to send conditional
anoadragon453 Nov 19, 2021
179dd5a
_handle_to_device -> _get_to_device_messages
anoadragon453 Nov 19, 2021
8b0bbc1
Rename ApplicationServiceEphemeralEventsTestCase
anoadragon453 Nov 19, 2021
31c4b40
Rename user1, user2 in tests to something more useful
anoadragon453 Nov 19, 2021
bd9d963
Simplify registration of appservices in tests
anoadragon453 Nov 19, 2021
8f8226a
Fix existing unit tests
anoadragon453 Nov 22, 2021
b4a4b45
rename set_type_stream_id_for_appservice -> set_appservice_stream_typ…
anoadragon453 Nov 24, 2021
c691ef0
Add some FIXME comments
anoadragon453 Nov 24, 2021
7cf6ad9
Add comment on why we don't NOT NULL to_device_stream_id
anoadragon453 Nov 24, 2021
6d68b8a
Refactor and generalise the sending of arbitrary fields over AS trans…
anoadragon453 Dec 3, 2021
13b25cf
Fix tests to mock _TransactionController.send of ApplicationServiceSc…
anoadragon453 Dec 3, 2021
275e1e0
Add to-device messages as their own special section in AS txns
anoadragon453 Dec 3, 2021
403490d
Insert to-device messages into the new to-device part of AS txns
anoadragon453 Dec 3, 2021
385b3bf
Modify tests to handle new location of to-device messages in AS txns
anoadragon453 Dec 3, 2021
c0b157d
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Dec 7, 2021
ba91438
Fix calls to create_appservice_txn in tests
anoadragon453 Dec 7, 2021
0685021
Update synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_…
anoadragon453 Jan 11, 2022
e7f6732
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Jan 11, 2022
3d1661f
rename recipient_user_id_device_id_to_messages -> recipient_device_to…
anoadragon453 Jan 11, 2022
0ac079b
lint
anoadragon453 Jan 11, 2022
822e92a
Refactor storage methods to retrieve to-device messages
anoadragon453 Jan 25, 2022
25488fa
Update old references to get_new_messages_for_device
anoadragon453 Jan 25, 2022
026cb8a
Don't query for to-device messages without a device
anoadragon453 Jan 26, 2022
1121674
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Jan 26, 2022
8129657
Fix shape of simple_insert_many argument
anoadragon453 Jan 26, 2022
de48ab4
Only import MemoryReactor if type-checking in test_scheduler; fix old…
anoadragon453 Jan 26, 2022
d8b8f74
Update _txn function to match outer method name
anoadragon453 Jan 26, 2022
ced1314
Clarify users arg in _get_to_device_messages docstring
anoadragon453 Jan 28, 2022
c749fcb
assert stream_id returned by get_device_messages is as expected
anoadragon453 Jan 28, 2022
24512fb
Fix get_messages_for_device return type documentation
anoadragon453 Jan 28, 2022
24bc3c5
Clean up limit checking
anoadragon453 Jan 28, 2022
30b74a5
Move DB migration to schema v68
anoadragon453 Jan 28, 2022
80c3721
Accept iterables in enqueue_for_appservice
anoadragon453 Jan 28, 2022
3d8e50d
wording fixes
anoadragon453 Jan 28, 2022
50ebcb9
Apply suggestions from code review
anoadragon453 Jan 28, 2022
089e041
Only allow querying either a single device ID, or all device IDs of u…
anoadragon453 Jan 31, 2022
d06781e
Apply suggestions from code review
anoadragon453 Feb 1, 2022
c334eef
Require a user id/device id pair if passing 'limit'
anoadragon453 Feb 1, 2022
bf93ec4
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Feb 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
TYPE_CHECKING,
Awaitable,
Callable,
Collection,
Dict,
Iterable,
List,
Optional,
Set,
Expand Down Expand Up @@ -112,12 +112,13 @@ async def start(self) -> None:
def enqueue_for_appservice(
self,
appservice: ApplicationService,
events: Optional[Iterable[EventBase]] = None,
ephemeral: Optional[Iterable[JsonDict]] = None,
to_device_messages: Optional[Iterable[JsonDict]] = None,
events: Optional[Collection[EventBase]] = None,
ephemeral: Optional[Collection[JsonDict]] = None,
to_device_messages: Optional[Collection[JsonDict]] = None,
) -> None:
"""
Enqueue some data to be sent off to an application service.

Args:
appservice: The application service to create and send a transaction to.
events: The persistent room events to send.
Expand All @@ -127,7 +128,7 @@ def enqueue_for_appservice(
'to_user_id' fields.
"""
# We purposefully allow this method to run with empty events/ephemeral
# iterables, so that callers do not need to check iterable size themselves.
# collections, so that callers do not need to check iterable size themselves.
if not events and not ephemeral and not to_device_messages:
return

Expand Down
3 changes: 1 addition & 2 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def read_config(self, config: JsonDict, **kwargs):

# MSC2409 (this setting only relates to optionally sending to-device messages).
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
# Presence, typing and read receipt EDUs are already sent to application services that
# have opted in to receive them. This setting, if enabled, adds to-device messages
# to that list.
# have opted in to receive them. If enabled, this adds to-device messages to that list.
self.msc2409_to_device_messages_enabled: bool = experimental.get(
"msc2409_to_device_messages_enabled", False
)
7 changes: 4 additions & 3 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,16 +477,17 @@ async def _get_to_device_messages(
) -> List[JsonDict]:
"""
Given an application service, determine which events it should receive
from those between the last-recorded typing event stream token for this
from those between the last-recorded to-device message stream token for this
appservice and the given stream token.

Args:
service: The application service to check for which events it should receive.
new_token: The latest to-device event stream token.
users: The users that should receive new to-device messages.
users: The users to be notified for the new to-device messages
(ie, the recipients of the messages).

Returns:
A list of JSON dictionaries containing data derived from the typing events
A list of JSON dictionaries containing data derived from the to-device events
that should be sent to the given application service.
"""
# Get the stream token that this application service has processed up until
Expand Down
2 changes: 1 addition & 1 deletion synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ def on_new_event(
)
except Exception:
logger.exception(
"Error notifying application services of ephemeral event"
"Error notifying application services of ephemeral events"
)

def on_new_replication_data(self) -> None:
Expand Down
81 changes: 48 additions & 33 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,21 @@ async def get_messages_for_user_devices(
Returns:
A dictionary of (user id, device id) -> list of to-device messages.
"""
# We expect the stream ID returned by _get_new_device_messages to always
# return to_stream_id. So, no need to return it from this function.
user_id_device_id_to_messages, _ = await self._get_device_messages(
# We expect the stream ID returned by _get_device_messages to always
# be to_stream_id. So, no need to return it from this function.
(
user_id_device_id_to_messages,
last_processed_stream_id,
) = await self._get_device_messages(
user_ids=user_ids,
from_stream_id=from_stream_id,
to_stream_id=to_stream_id,
)

assert (
last_processed_stream_id == to_stream_id
), "Expected _get_device_messages to process all to-device messages up to `to_stream_id`"

return user_id_device_id_to_messages

async def get_messages_for_device(
Expand All @@ -190,7 +197,8 @@ async def get_messages_for_device(

Returns:
A tuple containing:
* A dictionary of (user id, device id) -> list of to-device messages.
* A list of to-device messages within the given stream id range intended for
the given user / device combo.
* The last-processed stream ID. Subsequent calls of this function with the
same device should pass this value as 'from_stream_id'.
"""
Expand All @@ -199,7 +207,7 @@ async def get_messages_for_device(
last_processed_stream_id,
) = await self._get_device_messages(
user_ids=[user_id],
device_ids=[device_id],
device_id=device_id,
from_stream_id=from_stream_id,
to_stream_id=to_stream_id,
limit=limit,
Expand All @@ -210,7 +218,7 @@ async def get_messages_for_device(
return [], to_stream_id

# Extract the messages, no need to return the user and device ID again
to_device_messages = list(user_id_device_id_to_messages.values())[0]
to_device_messages = user_id_device_id_to_messages.get((user_id, device_id), [])

return to_device_messages, last_processed_stream_id

Expand All @@ -219,7 +227,7 @@ async def _get_device_messages(
user_ids: Collection[str],
from_stream_id: int,
to_stream_id: int,
device_ids: Optional[Collection[str]] = None,
device_id: Optional[str] = None,
limit: Optional[int] = None,
) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]:
"""
Expand All @@ -230,7 +238,7 @@ async def _get_device_messages(

Note that a stream ID can be shared by multiple copies of the same message with
different recipient devices. Stream IDs are only unique in the context of a single
user ID / device ID pair Thus, applying a limit (of messages to return) when working
user ID / device ID pair. Thus, applying a limit (of messages to return) when working
with a sliding window of stream IDs is only possible when querying messages of a
single user device.

Expand All @@ -240,8 +248,9 @@ async def _get_device_messages(
user_ids: The user IDs to filter device messages by.
from_stream_id: The lower boundary of stream id to filter with (exclusive).
to_stream_id: The upper boundary of stream id to filter with (inclusive).
device_ids: If provided, only messages destined for these device IDs will be returned.
If not provided, all device IDs for the given user IDs will be used.
device_id: A device ID to query to-device messages for. If not provided, to-device
messages from all device IDs for the given user IDs will be queried. May not be
provided if `user_ids` contains more than one entry.
limit: The maximum number of to-device messages to return. Can only be used when
passing a single user ID / device ID tuple.

Expand All @@ -252,28 +261,32 @@ async def _get_device_messages(
there may be more messages to retrieve. If `limit` is not set, then this
is always equal to 'to_stream_id'.
"""
# A limit can only be applied when querying for a single user ID / device ID tuple.
if limit:
if not device_ids:
if not user_ids:
logger.warning("No users provided upon querying for device IDs")
return {}, to_stream_id

if len(user_ids) > 1:
if device_id is not None:
raise AssertionError(
"Programming error: _get_new_device_messages was passed 'limit' "
"but not device_ids. This could lead to querying multiple user ID "
"/ device ID pairs, which is not compatible with 'limit'"
"Programming error: 'device_id' cannot be supplied to "
"_get_device_messages when >1 user_id has been provided"
)

if len(user_ids) > 1 or len(device_ids) > 1:
# A limit can only be applied when querying for a single user ID / device ID tuple.
if limit:
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
raise AssertionError(
"Programming error: _get_new_device_messages was passed 'limit' "
"with >1 user id/device id pair"
"Programming error: _get_device_messages was passed 'limit' "
"with >1 user_id"
)
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

user_ids_to_query: Set[str] = set()
device_ids_to_query: Set[str] = set()

if device_ids is not None:
# If a collection of device IDs were passed, use them to filter results.
# Note that a device ID could be an empty str
if device_id is not None:
# If a device ID was passed, use it to filter results.
# Otherwise, device IDs will be derived from the given collection of user IDs.
device_ids_to_query.update(device_ids)
device_ids_to_query.add(device_id)

# Determine which users have devices with pending messages
for user_id in user_ids:
Expand All @@ -290,7 +303,7 @@ def get_device_messages_txn(txn: LoggingTransaction):
# If a list of device IDs was not provided, retrieve all devices IDs
# for the given users. We explicitly do not query hidden devices, as
# hidden devices should not receive to-device messages.
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
if not device_ids:
if not device_ids_to_query:
user_device_dicts = self.db_pool.simple_select_many_txn(
txn,
table="devices",
Expand All @@ -304,7 +317,7 @@ def get_device_messages_txn(txn: LoggingTransaction):
{row["device_id"] for row in user_device_dicts}
)

if not user_ids_to_query or not device_ids_to_query:
if not device_ids_to_query:
# We've ended up with no devices to query.
return {}, to_stream_id

Expand Down Expand Up @@ -335,16 +348,17 @@ def get_device_messages_txn(txn: LoggingTransaction):
)

# If a limit was provided, limit the data retrieved from the database
if limit:
if limit is not None:
sql += "LIMIT ?"
sql_args += (limit,)

txn.execute(sql, sql_args)

# Create and fill a dictionary of (user ID, device ID) -> list of messages
# intended for each device.
last_processed_stream_pos = to_stream_id
recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {}
for message_count, row in enumerate(txn, start=1):
for row in txn:
last_processed_stream_pos = row[0]
recipient_user_id = row[1]
recipient_device_id = row[2]
Expand All @@ -355,13 +369,14 @@ def get_device_messages_txn(txn: LoggingTransaction):
(recipient_user_id, recipient_device_id), []
).append(message_dict)

if limit and message_count == limit:
# We ended up hitting the message limit. There may be more messages to retrieve.
# Return what we have, as well as the last stream position that was processed.
#
# The caller is expected to set this as the lower (exclusive) bound
# for the next query of this device.
return recipient_device_to_messages, last_processed_stream_pos
if limit is not None and txn.rowcount == limit:
# We ended up bumping up against the message limit. There may be more messages
# to retrieve. Return what we have, as well as the last stream position that
# was processed.
#
# The caller is expected to set this as the lower (exclusive) bound
# for the next query of this device.
return recipient_device_to_messages, last_processed_stream_pos

# The limit was not reached, thus we know that recipient_device_to_messages
# contains all to-device messages for the given device and stream id range.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down