Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert a grab bag of database code to async/await #8195

Merged
merged 8 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8195.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
12 changes: 5 additions & 7 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,28 @@ def __init__(self, service, id, events):
self.id = id
self.events = events

def send(self, as_api):
async def send(self, as_api) -> bool:
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Sends this transaction using the provided AS API interface.

Args:
as_api(ApplicationServiceApi): The API to use to send.
Returns:
An Awaitable which resolves to True if the transaction was sent.
True if the transaction was sent.
"""
return as_api.push_bulk(
return await as_api.push_bulk(
service=self.service, events=self.events, txn_id=self.id
)

def complete(self, store):
async def complete(self, store) -> None:
"""Completes this transaction as successful.

Marks this transaction ID on the application service and removes the
transaction contents from the database.

Args:
store: The database store to operate on.
Returns:
A Deferred which resolves to True if the transaction was completed.
"""
return store.complete_appservice_txn(service=self.service, txn_id=self.id)
await store.complete_appservice_txn(service=self.service, txn_id=self.id)


class ApplicationService(object):
Expand Down
8 changes: 5 additions & 3 deletions synapse/federation/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,21 @@ def __init__(self, datastore):
self.store = datastore

@log_function
def have_responded(self, origin, transaction):
async def have_responded(self, origin, transaction):
""" Have we already responded to a transaction with the same id and
origin?

Returns:
Deferred: Results in `None` if we have not previously responded to
Awaitable: Results in `None` if we have not previously responded to
clokep marked this conversation as resolved.
Show resolved Hide resolved
this transaction or a 2-tuple of `(int, dict)` representing the
response code and response body.
"""
if not transaction.transaction_id:
raise RuntimeError("Cannot persist a transaction with no transaction_id")

return self.store.get_received_txn_response(transaction.transaction_id, origin)
return await self.store.get_received_txn_response(
transaction.transaction_id, origin
)

@log_function
async def set_response(
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1873,8 +1873,8 @@ async def get_persisted_pdu(
else:
return None

def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
async def get_min_depth_for_context(self, context):
return await self.store.get_min_depth(context)

async def _handle_new_event(
self, origin, event, state=None, auth_events=None, backfilled=False
Expand Down
15 changes: 6 additions & 9 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def set_appservice_state(self, service, state) -> None:
"application_services_state", {"as_id": service.id}, {"state": state}
)

def create_appservice_txn(self, service, events):
async def create_appservice_txn(self, service, events):
"""Atomically creates a new transaction for this application service
with the given list of events.

Expand Down Expand Up @@ -209,20 +209,17 @@ def _create_appservice_txn(txn):
)
return AppServiceTransaction(service=service, id=new_txn_id, events=events)

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"create_appservice_txn", _create_appservice_txn
)

def complete_appservice_txn(self, txn_id, service):
async def complete_appservice_txn(self, txn_id, service) -> None:
"""Completes an application service transaction.

Args:
txn_id(str): The transaction ID being completed.
service(ApplicationService): The application service which was sent
this transaction.
Returns:
A Deferred which resolves if this transaction was stored
successfully.
"""
txn_id = int(txn_id)

Expand Down Expand Up @@ -258,7 +255,7 @@ def _complete_appservice_txn(txn):
{"txn_id": txn_id, "as_id": service.id},
)

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"complete_appservice_txn", _complete_appservice_txn
)

Expand Down Expand Up @@ -312,13 +309,13 @@ def _get_last_txn(self, txn, service_id):
else:
return int(last_txn_id[0]) # select 'last_txn' col

def set_appservice_last_pos(self, pos):
async def set_appservice_last_pos(self, pos) -> None:
def set_appservice_last_pos_txn(txn):
txn.execute(
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
)

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"set_appservice_last_pos", set_appservice_last_pos_txn
)

Expand Down
12 changes: 6 additions & 6 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ def get_new_messages_for_remote_destination_txn(txn):
)

@trace
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
async def delete_device_msgs_for_remote(
self, destination: str, up_to_stream_id: int
) -> None:
"""Used to delete messages when the remote destination acknowledges
their receipt.

Args:
destination(str): The destination server_name
up_to_stream_id(int): Where to delete messages up to.
Returns:
A deferred that resolves when the messages have been deleted.
destination: The destination server_name
up_to_stream_id: Where to delete messages up to.
"""

def delete_messages_for_remote_destination_txn(txn):
Expand All @@ -209,7 +209,7 @@ def delete_messages_for_remote_destination_txn(txn):
)
txn.execute(sql, (destination, up_to_stream_id))

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
)

Expand Down
30 changes: 16 additions & 14 deletions synapse/storage/databases/main/e2e_room_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=Non

return sessions

def get_e2e_room_keys_multi(self, user_id, version, room_keys):
async def get_e2e_room_keys_multi(self, user_id, version, room_keys):
"""Get multiple room keys at a time. The difference between this function and
get_e2e_room_keys is that this function can be used to retrieve
multiple specific keys at a time, whereas get_e2e_room_keys is used for
Expand All @@ -166,10 +166,10 @@ def get_e2e_room_keys_multi(self, user_id, version, room_keys):
that we want to query

Returns:
Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
dict[str, dict[str, dict]]: a map of room IDs to session IDs to room key
"""

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_e2e_room_keys_multi",
self._get_e2e_room_keys_multi_txn,
user_id,
Expand Down Expand Up @@ -283,7 +283,7 @@ def _get_current_version(txn, user_id):
raise StoreError(404, "No current backup version")
return row[0]

def get_e2e_room_keys_version_info(self, user_id, version=None):
async def get_e2e_room_keys_version_info(self, user_id, version=None):
"""Get info metadata about a version of our room_keys backup.

Args:
Expand All @@ -293,7 +293,7 @@ def get_e2e_room_keys_version_info(self, user_id, version=None):
Raises:
StoreError: with code 404 if there are no e2e_room_keys_versions present
Returns:
A deferred dict giving the info metadata for this backup version, with
A dict giving the info metadata for this backup version, with
fields including:
version(str)
algorithm(str)
Expand Down Expand Up @@ -324,12 +324,12 @@ def _get_e2e_room_keys_version_info_txn(txn):
result["etag"] = 0
return result

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
)

@trace
def create_e2e_room_keys_version(self, user_id, info):
async def create_e2e_room_keys_version(self, user_id: str, info: dict) -> str:
"""Atomically creates a new version of this user's e2e_room_keys store
with the given version info.

Expand All @@ -338,7 +338,7 @@ def create_e2e_room_keys_version(self, user_id, info):
info(dict): the info about the backup version to be created

Returns:
A deferred string for the newly created version ID
The newly created version ID
"""

def _create_e2e_room_keys_version_txn(txn):
Expand All @@ -365,7 +365,7 @@ def _create_e2e_room_keys_version_txn(txn):

return new_version

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
)

Expand Down Expand Up @@ -403,13 +403,15 @@ async def update_e2e_room_keys_version(
)

@trace
def delete_e2e_room_keys_version(self, user_id, version=None):
async def delete_e2e_room_keys_version(
self, user_id: str, version: Optional[str] = None
) -> None:
"""Delete a given backup version of the user's room keys.
Doesn't delete their actual key data.

Args:
user_id(str): the user whose backup version we're deleting
version(str): Optional. the version ID of the backup version we're deleting
user_id: the user whose backup version we're deleting
version: Optional. the version ID of the backup version we're deleting
If missing, we delete the current backup version info.
Raises:
StoreError: with code 404 if there are no e2e_room_keys_versions present,
Expand All @@ -430,13 +432,13 @@ def _delete_e2e_room_keys_version_txn(txn):
keyvalues={"user_id": user_id, "version": this_version},
)

return self.db_pool.simple_update_one_txn(
self.db_pool.simple_update_one_txn(
txn,
table="e2e_room_keys_versions",
keyvalues={"user_id": user_id, "version": this_version},
updatevalues={"deleted": 1},
)

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"delete_e2e_room_keys_version", _delete_e2e_room_keys_version_txn
)
Loading