Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make StreamIdGen get_next and get_next_mult async #8161

Merged
merged 3 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8161.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `StreamIdGenerator` and `MultiWriterIdGenerator` to have the same interface.
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ async def add_account_data_to_room(
"""
content_json = json_encoder.encode(content)

with self._account_data_id_gen.get_next() as next_id:
with await self._account_data_id_gen.get_next() as next_id:
# no need to lock here as room_account_data has a unique constraint
# on (user_id, room_id, account_data_type) so simple_upsert will
# retry if there is a conflict.
Expand Down Expand Up @@ -384,7 +384,7 @@ async def add_account_data_for_user(
"""
content_json = json_encoder.encode(content)

with self._account_data_id_gen.get_next() as next_id:
with await self._account_data_id_gen.get_next() as next_id:
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def add_messages_txn(txn, now_ms, stream_id):
rows.append((destination, stream_id, now_ms, edu_json))
txn.executemany(sql, rows)

with self._device_inbox_id_gen.get_next() as stream_id:
with await self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
Expand Down Expand Up @@ -411,7 +411,7 @@ def add_messages_txn(txn, now_ms, stream_id):
txn, stream_id, local_messages_by_user_then_device
)

with self._device_inbox_id_gen.get_next() as stream_id:
with await self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_from_remote_to_device_inbox",
Expand Down
8 changes: 5 additions & 3 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ async def add_user_signature_change_to_streams(
THe new stream ID.
"""

with self._device_list_id_gen.get_next() as stream_id:
with await self._device_list_id_gen.get_next() as stream_id:
await self.db_pool.runInteraction(
"add_user_sig_change_to_streams",
self._add_user_signature_change_txn,
Expand Down Expand Up @@ -1146,7 +1146,9 @@ async def add_device_change_to_streams(
if not device_ids:
return

with self._device_list_id_gen.get_next_mult(len(device_ids)) as stream_ids:
with await self._device_list_id_gen.get_next_mult(
len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
"add_device_change_to_stream",
self._add_device_change_to_stream_txn,
Expand All @@ -1159,7 +1161,7 @@ async def add_device_change_to_streams(
return stream_ids[-1]

context = get_active_span_text_map()
with self._device_list_id_gen.get_next_mult(
with await self._device_list_id_gen.get_next_mult(
len(hosts) * len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
Expand Down
43 changes: 23 additions & 20 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ def delete_e2e_keys_by_device_txn(txn):
"delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
)

def _set_e2e_cross_signing_key_txn(self, txn, user_id, key_type, key):
def _set_e2e_cross_signing_key_txn(self, txn, user_id, key_type, key, stream_id):
"""Set a user's cross-signing key.

Args:
Expand All @@ -658,6 +658,7 @@ def _set_e2e_cross_signing_key_txn(self, txn, user_id, key_type, key):
for a master key, 'self_signing' for a self-signing key, or
'user_signing' for a user-signing key
key (dict): the key data
stream_id (int)
"""
# the 'key' dict will look something like:
# {
Expand Down Expand Up @@ -695,37 +696,39 @@ def _set_e2e_cross_signing_key_txn(self, txn, user_id, key_type, key):
)

# and finally, store the key itself
with self._cross_signing_id_gen.get_next() as stream_id:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have never been in a _txn function

self.db_pool.simple_insert_txn(
txn,
"e2e_cross_signing_keys",
values={
"user_id": user_id,
"keytype": key_type,
"keydata": json_encoder.encode(key),
"stream_id": stream_id,
},
)
self.db_pool.simple_insert_txn(
txn,
"e2e_cross_signing_keys",
values={
"user_id": user_id,
"keytype": key_type,
"keydata": json_encoder.encode(key),
"stream_id": stream_id,
},
)

self._invalidate_cache_and_stream(
txn, self._get_bare_e2e_cross_signing_keys, (user_id,)
)

def set_e2e_cross_signing_key(self, user_id, key_type, key):
async def set_e2e_cross_signing_key(self, user_id, key_type, key):
"""Set a user's cross-signing key.

Args:
user_id (str): the user to set the user-signing key for
key_type (str): the type of cross-signing key to set
key (dict): the key data
"""
return self.db_pool.runInteraction(
"add_e2e_cross_signing_key",
self._set_e2e_cross_signing_key_txn,
user_id,
key_type,
key,
)

with await self._cross_signing_id_gen.get_next() as stream_id:
return await self.db_pool.runInteraction(
"add_e2e_cross_signing_key",
self._set_e2e_cross_signing_key_txn,
user_id,
key_type,
key,
stream_id,
)

def store_e2e_cross_signing_signatures(self, user_id, signatures):
"""Stores cross-signing signatures.
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ async def _persist_events_and_state_updates(
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
stream_ordering_manager = await self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
else:
stream_ordering_manager = self._stream_id_gen.get_next_mult(
stream_ordering_manager = await self._stream_id_gen.get_next_mult(
len(events_and_contexts)
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/group_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ def _register_user_group_membership_txn(txn, next_id):

return next_id

with self._group_updates_id_gen.get_next() as next_id:
with await self._group_updates_id_gen.get_next() as next_id:
res = await self.db_pool.runInteraction(
"register_user_group_membership",
_register_user_group_membership_txn,
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

class PresenceStore(SQLBaseStore):
async def update_presence(self, presence_states):
stream_ordering_manager = self._presence_id_gen.get_next_mult(
stream_ordering_manager = await self._presence_id_gen.get_next_mult(
len(presence_states)
)

Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async def add_push_rule(
) -> None:
conditions_json = json_encoder.encode(conditions)
actions_json = json_encoder.encode(actions)
with self._push_rules_stream_id_gen.get_next() as stream_id:
with await self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

if before or after:
Expand Down Expand Up @@ -560,7 +560,7 @@ def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE"
)

with self._push_rules_stream_id_gen.get_next() as stream_id:
with await self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
Expand All @@ -571,7 +571,7 @@ def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
)

async def set_push_rule_enabled(self, user_id, rule_id, enabled) -> None:
with self._push_rules_stream_id_gen.get_next() as stream_id:
with await self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
Expand Down Expand Up @@ -646,7 +646,7 @@ def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
data={"actions": actions_json},
)

with self._push_rules_stream_id_gen.get_next() as stream_id:
with await self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ async def add_pusher(
last_stream_ordering,
profile_tag="",
) -> None:
with self._pushers_id_gen.get_next() as stream_id:
with await self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so simple_upsert will retry
await self.db_pool.simple_upsert(
Expand Down Expand Up @@ -344,7 +344,7 @@ def delete_pusher_txn(txn, stream_id):
},
)

with self._pushers_id_gen.get_next() as stream_id:
with await self._pushers_id_gen.get_next() as stream_id:
await self.db_pool.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
)
3 changes: 1 addition & 2 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,7 @@ def graph_to_linear(txn):
"insert_receipt_conv", graph_to_linear
)

stream_id_manager = self._receipts_id_gen.get_next()
with stream_id_manager as stream_id:
with await self._receipts_id_gen.get_next() as stream_id:
event_ts = await self.db_pool.runInteraction(
"insert_linearized_receipt",
self.insert_linearized_receipt_txn,
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ def store_room_txn(txn, next_id):
},
)

with self._public_room_id_gen.get_next() as next_id:
with await self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"store_room_txn", store_room_txn, next_id
)
Expand Down Expand Up @@ -1196,7 +1196,7 @@ def set_room_is_public_txn(txn, next_id):
},
)

with self._public_room_id_gen.get_next() as next_id:
with await self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public", set_room_is_public_txn, next_id
)
Expand Down Expand Up @@ -1276,7 +1276,7 @@ def set_room_is_public_appservice_txn(txn, next_id):
},
)

with self._public_room_id_gen.get_next() as next_id:
with await self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public_appservice",
set_room_is_public_appservice_txn,
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def add_tag_txn(txn, next_id):
)
self._update_revision_txn(txn, user_id, room_id, next_id)

with self._account_data_id_gen.get_next() as next_id:
with await self._account_data_id_gen.get_next() as next_id:
await self.db_pool.runInteraction("add_tag", add_tag_txn, next_id)

self.get_tags_for_user.invalidate((user_id,))
Expand All @@ -232,7 +232,7 @@ def remove_tag_txn(txn, next_id):
txn.execute(sql, (user_id, room_id, tag))
self._update_revision_txn(txn, user_id, room_id, next_id)

with self._account_data_id_gen.get_next() as next_id:
with await self._account_data_id_gen.get_next() as next_id:
await self.db_pool.runInteraction("remove_tag", remove_tag_txn, next_id)

self.get_tags_for_user.invalidate((user_id,))
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class StreamIdGenerator(object):
upwards, -1 to grow downwards.

Usage:
with stream_id_gen.get_next() as stream_id:
with await stream_id_gen.get_next() as stream_id:
# ... persist event ...
"""

Expand All @@ -95,10 +95,10 @@ def __init__(self, db_conn, table, column, extra_tables=[], step=1):
)
self._unfinished_ids = deque() # type: Deque[int]

def get_next(self):
async def get_next(self):
"""
Usage:
with stream_id_gen.get_next() as stream_id:
with await stream_id_gen.get_next() as stream_id:
# ... persist event ...
"""
with self._lock:
Expand All @@ -117,10 +117,10 @@ def manager():

return manager()

def get_next_mult(self, n):
async def get_next_mult(self, n):
"""
Usage:
with stream_id_gen.get_next(n) as stream_ids:
with await stream_id_gen.get_next(n) as stream_ids:
# ... persist events ...
"""
with self._lock:
Expand Down