Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert some of the data store to async #7976

Merged
merged 5 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7976.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
92 changes: 44 additions & 48 deletions synapse/storage/data_stores/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
# limitations under the License.

import logging
from typing import List

from canonicaljson import json

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
from synapse.storage.database import Database
Expand Down Expand Up @@ -166,8 +165,9 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):

return {"notify_count": notify_count, "highlight_count": highlight_count}

@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
async def get_push_action_users_in_range(
self, min_stream_ordering, max_stream_ordering
):
def f(txn):
sql = (
"SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
Expand All @@ -176,26 +176,28 @@ def f(txn):
txn.execute(sql, (min_stream_ordering, max_stream_ordering))
return [r[0] for r in txn]

ret = yield self.db.runInteraction("get_push_action_users_in_range", f)
ret = await self.db.runInteraction("get_push_action_users_in_range", f)
return ret

@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range_for_http(
self, user_id, min_stream_ordering, max_stream_ordering, limit=20
):
async def get_unread_push_actions_for_user_in_range_for_http(
self,
user_id: str,
min_stream_ordering: int,
max_stream_ordering: int,
limit: int = 20,
) -> List[dict]:
"""Get a list of the most recent unread push actions for a given user,
within the given stream ordering range. Called by the httppusher.

Args:
user_id (str): The user to fetch push actions for.
min_stream_ordering(int): The exclusive lower bound on the
user_id: The user to fetch push actions for.
min_stream_ordering: The exclusive lower bound on the
stream ordering of event push actions to fetch.
max_stream_ordering(int): The inclusive upper bound on the
max_stream_ordering: The inclusive upper bound on the
stream ordering of event push actions to fetch.
limit (int): The maximum number of rows to return.
limit: The maximum number of rows to return.
Returns:
A promise which resolves to a list of dicts with the keys "event_id",
"room_id", "stream_ordering", "actions".
A list of dicts with the keys "event_id", "room_id", "stream_ordering", "actions".
The list will be ordered by ascending stream_ordering.
The list will have between 0~limit entries.
"""
Expand Down Expand Up @@ -228,7 +230,7 @@ def get_after_receipt(txn):
txn.execute(sql, args)
return txn.fetchall()

after_read_receipt = yield self.db.runInteraction(
after_read_receipt = await self.db.runInteraction(
"get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
)

Expand Down Expand Up @@ -256,7 +258,7 @@ def get_no_receipt(txn):
txn.execute(sql, args)
return txn.fetchall()

no_read_receipt = yield self.db.runInteraction(
no_read_receipt = await self.db.runInteraction(
"get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
)

Expand All @@ -280,23 +282,25 @@ def get_no_receipt(txn):
# one of the subqueries may have hit the limit.
return notifs[:limit]

@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range_for_email(
self, user_id, min_stream_ordering, max_stream_ordering, limit=20
):
async def get_unread_push_actions_for_user_in_range_for_email(
self,
user_id: str,
min_stream_ordering: int,
max_stream_ordering: int,
limit: int = 20,
) -> List[dict]:
"""Get a list of the most recent unread push actions for a given user,
within the given stream ordering range. Called by the emailpusher

Args:
user_id (str): The user to fetch push actions for.
min_stream_ordering(int): The exclusive lower bound on the
user_id: The user to fetch push actions for.
min_stream_ordering: The exclusive lower bound on the
stream ordering of event push actions to fetch.
max_stream_ordering(int): The inclusive upper bound on the
max_stream_ordering: The inclusive upper bound on the
stream ordering of event push actions to fetch.
limit (int): The maximum number of rows to return.
limit: The maximum number of rows to return.
Returns:
A promise which resolves to a list of dicts with the keys "event_id",
"room_id", "stream_ordering", "actions", "received_ts".
A list of dicts with the keys "event_id", "room_id", "stream_ordering", "actions", "received_ts".
The list will be ordered by descending received_ts.
The list will have between 0~limit entries.
"""
Expand Down Expand Up @@ -328,7 +332,7 @@ def get_after_receipt(txn):
txn.execute(sql, args)
return txn.fetchall()

after_read_receipt = yield self.db.runInteraction(
after_read_receipt = await self.db.runInteraction(
"get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
)

Expand Down Expand Up @@ -356,7 +360,7 @@ def get_no_receipt(txn):
txn.execute(sql, args)
return txn.fetchall()

no_read_receipt = yield self.db.runInteraction(
no_read_receipt = await self.db.runInteraction(
"get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
)

Expand Down Expand Up @@ -461,17 +465,13 @@ def _add_push_actions_to_staging_txn(txn):
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)

@defer.inlineCallbacks
def remove_push_actions_from_staging(self, event_id):
async def remove_push_actions_from_staging(self, event_id: str) -> None:
"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB

Args:
event_id (str)
"""

try:
res = yield self.db.simple_delete(
res = await self.db.simple_delete(
table="event_push_actions_staging",
keyvalues={"event_id": event_id},
desc="remove_push_actions_from_staging",
Expand Down Expand Up @@ -606,8 +606,7 @@ def _find_first_stream_ordering_after_ts_txn(txn, ts):

return range_end

@defer.inlineCallbacks
def get_time_of_last_push_action_before(self, stream_ordering):
async def get_time_of_last_push_action_before(self, stream_ordering):
def f(txn):
sql = (
"SELECT e.received_ts"
Expand All @@ -620,7 +619,7 @@ def f(txn):
txn.execute(sql, (stream_ordering,))
return txn.fetchone()

result = yield self.db.runInteraction("get_time_of_last_push_action_before", f)
result = await self.db.runInteraction("get_time_of_last_push_action_before", f)
return result[0] if result else None


Expand Down Expand Up @@ -650,8 +649,7 @@ def __init__(self, database: Database, db_conn, hs):
self._start_rotate_notifs, 30 * 60 * 1000
)

@defer.inlineCallbacks
def get_push_actions_for_user(
async def get_push_actions_for_user(
self, user_id, before=None, limit=50, only_highlight=False
):
def f(txn):
Expand Down Expand Up @@ -682,18 +680,17 @@ def f(txn):
txn.execute(sql, args)
return self.db.cursor_to_dict(txn)

push_actions = yield self.db.runInteraction("get_push_actions_for_user", f)
push_actions = await self.db.runInteraction("get_push_actions_for_user", f)
for pa in push_actions:
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
return push_actions

@defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
async def get_latest_push_action_stream_ordering(self):
def f(txn):
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
return txn.fetchone()

result = yield self.db.runInteraction(
result = await self.db.runInteraction(
"get_latest_push_action_stream_ordering", f
)
return result[0] or 0
Expand Down Expand Up @@ -747,8 +744,7 @@ def _remove_old_push_actions_before_txn(
def _start_rotate_notifs(self):
return run_as_background_process("rotate_notifs", self._rotate_notifs)

@defer.inlineCallbacks
def _rotate_notifs(self):
async def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
self._doing_notif_rotation = True
Expand All @@ -757,12 +753,12 @@ def _rotate_notifs(self):
while True:
logger.info("Rotating notifications")

caught_up = yield self.db.runInteraction(
caught_up = await self.db.runInteraction(
"_rotate_notifs", self._rotate_notifs_txn
)
if caught_up:
break
yield self.hs.get_clock().sleep(self._rotate_delay)
await self.hs.get_clock().sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False

Expand Down
Loading