Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track device list updates per room. #12321

Merged
merged 30 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b5a9c6b
Operate on room IDs
erikjohnston Mar 25, 2022
6fa639e
Don't copy the hosts list
erikjohnston Mar 28, 2022
ecf98b9
Write to a new `device_lists_changes_in_room` table.
erikjohnston Mar 28, 2022
c5dd83f
Convert to doing everything in a transaction
erikjohnston Mar 28, 2022
bb44214
Track if we've calculated remote hosts
erikjohnston Mar 28, 2022
98fceb3
We actually don't want stream_id to be unique
erikjohnston Mar 28, 2022
eda0e64
Handle room pokes that haven't been converted to outbound pokes
erikjohnston Mar 29, 2022
c7790ab
Deduplicate outbound pokes
erikjohnston Mar 29, 2022
6e9b31a
Add a config option that allows using new code path
erikjohnston Mar 29, 2022
3b2ab93
Add tests for new code path
erikjohnston Mar 29, 2022
f8af30f
Newsfile
erikjohnston Mar 29, 2022
7266580
Fix tests
erikjohnston Mar 29, 2022
f24b70b
Merge remote-tracking branch 'origin/develop' into erikj/device_list_…
erikjohnston Mar 31, 2022
8bd8ee2
Update synapse/storage/schema/main/delta/69/01device_list_oubound_by_…
erikjohnston Apr 4, 2022
56f0913
Update synapse/storage/schema/main/delta/69/01device_list_oubound_by_…
erikjohnston Apr 4, 2022
90d41a0
Encode opentracing context just once.
erikjohnston Apr 4, 2022
c470a12
Rename var
erikjohnston Apr 4, 2022
d030062
Remove `if not room_ids` check.
erikjohnston Apr 4, 2022
ad5d46b
Add unique index
erikjohnston Apr 4, 2022
d5031b0
Note lack of foreign key constraint
erikjohnston Apr 4, 2022
28dacc8
Add comment about stream_id duplicates
erikjohnston Apr 4, 2022
f48527f
Update synapse_port_db
erikjohnston Apr 4, 2022
bd45f19
Inequality the wrong way round
erikjohnston Apr 4, 2022
dee8f55
Add note about 'num_stream_ids'
erikjohnston Apr 4, 2022
3574541
Merge remote-tracking branch 'origin/develop' into erikj/device_list_…
erikjohnston Apr 4, 2022
cf04f1a
Use different stream IDs for device_list_outbound_pokes
erikjohnston Apr 4, 2022
89e10d7
Correctly order device list stream updates
erikjohnston Apr 4, 2022
e54d2d4
Wake up replication after adding otubound pokes
erikjohnston Apr 4, 2022
7d79dee
Apply suggestions from code review
erikjohnston Apr 4, 2022
b61c5c7
Remove get_users_who_share_room_with_user stub in test
erikjohnston Apr 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
SynapseError,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import (
JsonDict,
StreamToken,
Expand Down Expand Up @@ -278,6 +281,16 @@ def __init__(self, hs: "HomeServer"):

hs.get_distributor().observe("user_left_room", self.user_left_room)

# Whether `_handle_new_device_update_async` is currently processing.
self._handle_new_device_update_is_processing = False

# If a new device update may have happened while the loop was
# processing.
self._handle_new_device_update_new_data = False

# On start up check if there are any updates pending.
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)

def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
Expand Down Expand Up @@ -503,6 +516,9 @@ async def notify_device_update(
"device_list_key", position, users={user_id}, rooms=room_ids
)

# We may need to do some processing asynchronously.
self._handle_new_device_update_async()

if hosts:
logger.info(
"Sending device list update notif for %r to: %r", user_id, hosts
Expand Down Expand Up @@ -618,6 +634,64 @@ async def rehydrate_device(

return {"success": True}

@wrap_as_background_process("_handle_new_device_update_async")
async def _handle_new_device_update_async(self) -> None:
"""Called when we have a new local device list update that we need to
send out over federation.

This happens in the background so as not to block the original request
that generated the device update.
"""
if self._handle_new_device_update_is_processing:
self._handle_new_device_update_new_data = True
return

self._handle_new_device_update_is_processing = True

try:
while True:
self._handle_new_device_update_new_data = False
rows = await self.store.get_uncoverted_outbound_room_pokes()
if not rows:
# If the DB returned nothing then there is nothing left to
# do, *unless* a new device list update happened during the
# DB query.
if self._handle_new_device_update_new_data:
continue
else:
return

for user_id, device_id, room_id, stream_id, opentracing_context in rows:
joined_user_ids = await self.store.get_users_in_room(room_id)
hosts = {get_domain_from_id(u) for u in joined_user_ids}
hosts.discard(self.server_name)

await self.store.add_device_list_outbound_pokes(
user_id=user_id,
device_id=device_id,
room_id=room_id,
stream_id=stream_id,
hosts=hosts,
context=opentracing_context,
)

if hosts:
logger.info(
"Sending device list update notif for %r to: %r",
user_id,
hosts,
)
for host in hosts:
self.federation_sender.send_device_messages(
host, immediate=False
)
log_kv(
{"message": "sent device update to host", "host": host}
)

finally:
self._handle_new_device_update_is_processing = False


def _update_device_from_client_ips(
device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
Expand Down
81 changes: 80 additions & 1 deletion synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,7 @@ def _add_device_outbound_poke_to_stream_txn(
user_id: str,
device_ids: Iterable[str],
hosts: Collection[str],
stream_ids: List[str],
stream_ids: List[int],
context: Dict[str, str],
) -> None:
for host in hosts:
Expand Down Expand Up @@ -1693,3 +1693,82 @@ def _add_device_outbound_room_poke_txn(
for device_id, stream_id in zip(device_ids, stream_ids)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
],
)

async def get_uncoverted_outbound_room_pokes(
self, limit: int = 10
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the return type here is incorrect: the optional dict should be Optional[str]. (Presumably we json-decode the string elsewhere when needed.)

Xref #12485 (comment)

"""Get device list changes by room that have not yet been handled and
written to `device_lists_outbound_pokes`.

Returns:
A list of user ID, device ID, room ID, stream ID and optional opentracing context.
"""

sql = """
SELECT user_id, device_id, room_id, stream_id, opentracing_context
FROM device_lists_changes_in_room
WHERE NOT converted_to_destinations
ORDER BY stream_id
LIMIT ?
"""

def get_uncoverted_outbound_room_pokes_txn(txn):
txn.execute(sql, (limit,))
return txn.fetchall()

return await self.db_pool.runInteraction(
"get_uncoverted_outbound_room_pokes", get_uncoverted_outbound_room_pokes_txn
)

async def add_device_list_outbound_pokes(
self,
user_id: str,
device_id: str,
room_id: str,
stream_id: int,
hosts: Collection[str],
context: Optional[Dict[str, str]],
) -> None:
"""Queue the device update to be sent to the given set of hosts,
calculated from the room ID.

Marks the associated row in `device_lists_changes_in_room` as handled.
"""

def add_device_list_outbound_pokes_txn(txn, stream_ids: List[int]):
if hosts:
self._add_device_outbound_poke_to_stream_txn(
txn,
user_id=user_id,
device_ids=[device_id],
hosts=hosts,
stream_ids=stream_ids,
context=context,
)

self.db_pool.simple_update_txn(
txn,
table="device_lists_changes_in_room",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"stream_id": stream_id,
"room_id": room_id,
},
updatevalues={"converted_to_destinations": True},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be a silly question, but why do we do it this way, instead of keeping track of the point we've processed up to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, we could. I don't think I had much reasoning beyond not having to worry about extra tables or handling the duplicate stream IDs.

I've also been thinking about how you'd distribute this across multiple workers, but I'm not sure this helps that or not 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmmhmm. Generally I prefer to keep our tables append-only where possible (it saves a world of caching problems later on), but fair enough.

)

if not hosts:
# If there are no hosts then we don't try and generate stream IDs.
return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes",
add_device_list_outbound_pokes_txn,
[],
)

async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes",
add_device_list_outbound_pokes_txn,
stream_ids,
)