Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track device list updates per room. #12321

Merged
merged 30 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b5a9c6b
Operate on room IDs
erikjohnston Mar 25, 2022
6fa639e
Don't copy the hosts list
erikjohnston Mar 28, 2022
ecf98b9
Write to a new `device_lists_changes_in_room` table.
erikjohnston Mar 28, 2022
c5dd83f
Convert to doing everything in a transaction
erikjohnston Mar 28, 2022
bb44214
Track if we've calculated remote hosts
erikjohnston Mar 28, 2022
98fceb3
We actually don't want stream_id to be unique
erikjohnston Mar 28, 2022
eda0e64
Handle room pokes that haven't been converted to outbound pokes
erikjohnston Mar 29, 2022
c7790ab
Deduplicate outbound pokes
erikjohnston Mar 29, 2022
6e9b31a
Add a config option that allows using new code path
erikjohnston Mar 29, 2022
3b2ab93
Add tests for new code path
erikjohnston Mar 29, 2022
f8af30f
Newsfile
erikjohnston Mar 29, 2022
7266580
Fix tests
erikjohnston Mar 29, 2022
f24b70b
Merge remote-tracking branch 'origin/develop' into erikj/device_list_…
erikjohnston Mar 31, 2022
8bd8ee2
Update synapse/storage/schema/main/delta/69/01device_list_oubound_by_…
erikjohnston Apr 4, 2022
56f0913
Update synapse/storage/schema/main/delta/69/01device_list_oubound_by_…
erikjohnston Apr 4, 2022
90d41a0
Encode opentracing context just once.
erikjohnston Apr 4, 2022
c470a12
Rename var
erikjohnston Apr 4, 2022
d030062
Remove `if not room_ids` check.
erikjohnston Apr 4, 2022
ad5d46b
Add unique index
erikjohnston Apr 4, 2022
d5031b0
Note lack of foreign key constraint
erikjohnston Apr 4, 2022
28dacc8
Add comment about stream_id duplicates
erikjohnston Apr 4, 2022
f48527f
Update synapse_port_db
erikjohnston Apr 4, 2022
bd45f19
Inequality the wrong way round
erikjohnston Apr 4, 2022
dee8f55
Add note about 'num_stream_ids'
erikjohnston Apr 4, 2022
3574541
Merge remote-tracking branch 'origin/develop' into erikj/device_list_…
erikjohnston Apr 4, 2022
cf04f1a
Use different stream IDs for device_list_outbound_pokes
erikjohnston Apr 4, 2022
89e10d7
Correctly order device list stream updates
erikjohnston Apr 4, 2022
e54d2d4
Wake up replication after adding otubound pokes
erikjohnston Apr 4, 2022
7d79dee
Apply suggestions from code review
erikjohnston Apr 4, 2022
b61c5c7
Remove get_users_who_share_room_with_user stub in test
erikjohnston Apr 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
"users": ["shadow_banned"],
"e2e_fallback_keys_json": ["used"],
"access_tokens": ["used"],
"device_lists_changes_in_room": ["converted_to_destinations"],
}


Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ async def notify_device_update(
room_ids = await self.store.get_rooms_for_user(user_id)

hosts: Optional[Set[str]] = None
if self.use_new_device_lists_changes_in_room:
if not self.use_new_device_lists_changes_in_room:
hosts = set()

if self.hs.is_mine_id(user_id):
Expand Down
25 changes: 16 additions & 9 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1555,6 +1555,8 @@ async def add_device_change_to_streams(

num_stream_ids = len(device_ids)
if hosts:
# The `device_lists_outbound_pokes` wants a different stream ID for
# each row, which is a row per host per device update.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
num_stream_ids = len(hosts) * len(device_ids)

context = get_active_span_text_map()
Expand All @@ -1567,9 +1569,6 @@ def add_device_changes_txn(txn, stream_ids):
stream_ids,
)

if not room_ids:
return

self._add_device_outbound_room_poke_txn(
txn,
user_id,
Expand All @@ -1580,6 +1579,9 @@ def add_device_changes_txn(txn, stream_ids):
hosts_have_been_calculated=hosts is not None,
)

# If the set of hosts to send to has not been calculated yet (and so
# `hosts` is None) or there are no `hosts` to send to, then skip
# trying to persist them to the DB.
if not hosts:
return

Expand Down Expand Up @@ -1653,8 +1655,9 @@ def _add_device_outbound_poke_to_stream_txn(
)

now = self._clock.time_msec()
next_stream_id = iter(stream_ids)
stream_id_iterator = iter(stream_ids)

encoded_context = json_encoder.encode(context)
self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_outbound_pokes",
Expand All @@ -1670,14 +1673,12 @@ def _add_device_outbound_poke_to_stream_txn(
values=[
(
destination,
next(next_stream_id),
next(stream_id_iterator),
user_id,
device_id,
False,
now,
json_encoder.encode(context)
if whitelisted_homeserver(destination)
else "{}",
encoded_context if whitelisted_homeserver(destination) else "{}",
)
for destination in hosts
for device_id in device_ids
Expand Down Expand Up @@ -1706,6 +1707,12 @@ def _add_device_outbound_room_poke_txn(
hosts_have_been_calculated or not self.hs.is_mine_id(user_id)
)

encoded_context = json_encoder.encode(context)

# The `device_lists_changes_in_room.stream_id` column matches the
# corresponding `stream_id` of the update in the `device_lists_stream`
# table, i.e. all rows persisted for the same device update will have
# the same `stream_id` (but different room IDs).
self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_changes_in_room",
Expand All @@ -1724,7 +1731,7 @@ def _add_device_outbound_room_poke_txn(
room_id,
stream_id,
converted_to_destinations,
json_encoder.encode(context),
encoded_context,
)
for room_id in room_ids
for device_id, stream_id in zip(device_ids, stream_ids)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

Changes in SCHEMA_VERSION = 69:
- We now write to `device_lists_changes_in_room` table.
- Use sequence to generate future `application_services_txns.txn_id`s
"""


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,22 @@ CREATE TABLE device_lists_changes_in_room (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
room_id TEXT NOT NULL,
stream_id BIGINT NOT NULL, -- This matches `device_lists_stream.stream_id`
-- We track if we've calculated the hosts in the room and updated
-- `device_lists_outbound_pokes`.

-- This initialy matches `device_lists_stream.stream_id`. Note that we
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
-- delete older values from `device_lists_stream`, so we can't use a foreign
-- constraint here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer true as of cf04f1a, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That commit is about device_list_outbound_pokes? Rather than device_lists_stream or device_lists_changes_in_room?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. confused. sorry.

--
-- The table will contain rows with the same `stream_id` but different
-- `room_id`, as for each device update we store a row per room the user is
-- joined to. Therefore `(stream_id, room_id)` gives a unique index.
stream_id BIGINT NOT NULL,

-- We have a background process which goes through this table and converts
-- entries into rows in `device_lists_outbound_pokes`. Once we have processed
-- a row, we mark it as such by setting `converted_to_destinations=TRUE`.
converted_to_destinations BOOLEAN NOT NULL,
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
opentracing_context TEXT
);

CREATE INDEX device_lists_changes_in_stream_id ON device_lists_changes_in_room(stream_id);
CREATE INDEX device_lists_changes_in_stream_id_converted ON device_lists_changes_in_room(stream_id) WHERE NOT converted_to_destinations;
CREATE UNIQUE INDEX device_lists_changes_in_stream_id ON device_lists_changes_in_room(stream_id, room_id);
CREATE INDEX device_lists_changes_in_stream_id_unconverted ON device_lists_changes_in_room(stream_id) WHERE NOT converted_to_destinations;