Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit 'b3a97d6da' into anoa/dinsic_release_1_21_x
Browse files Browse the repository at this point in the history
* commit 'b3a97d6da':
  Convert some of the data store to async. (#7976)
  • Loading branch information
anoadragon453 committed Oct 16, 2020
2 parents 684991e + b3a97d6 commit c078f9e
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 207 deletions.
1 change: 1 addition & 0 deletions changelog.d/7976.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
92 changes: 44 additions & 48 deletions synapse/storage/data_stores/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
# limitations under the License.

import logging
from typing import List

from canonicaljson import json

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
from synapse.storage.database import Database
Expand Down Expand Up @@ -166,8 +165,9 @@ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):

return {"notify_count": notify_count, "highlight_count": highlight_count}

@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
async def get_push_action_users_in_range(
self, min_stream_ordering, max_stream_ordering
):
def f(txn):
sql = (
"SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
Expand All @@ -176,26 +176,28 @@ def f(txn):
txn.execute(sql, (min_stream_ordering, max_stream_ordering))
return [r[0] for r in txn]

ret = yield self.db.runInteraction("get_push_action_users_in_range", f)
ret = await self.db.runInteraction("get_push_action_users_in_range", f)
return ret

@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range_for_http(
self, user_id, min_stream_ordering, max_stream_ordering, limit=20
):
async def get_unread_push_actions_for_user_in_range_for_http(
self,
user_id: str,
min_stream_ordering: int,
max_stream_ordering: int,
limit: int = 20,
) -> List[dict]:
"""Get a list of the most recent unread push actions for a given user,
within the given stream ordering range. Called by the httppusher.
Args:
user_id (str): The user to fetch push actions for.
min_stream_ordering(int): The exclusive lower bound on the
user_id: The user to fetch push actions for.
min_stream_ordering: The exclusive lower bound on the
stream ordering of event push actions to fetch.
max_stream_ordering(int): The inclusive upper bound on the
max_stream_ordering: The inclusive upper bound on the
stream ordering of event push actions to fetch.
limit (int): The maximum number of rows to return.
limit: The maximum number of rows to return.
Returns:
A promise which resolves to a list of dicts with the keys "event_id",
"room_id", "stream_ordering", "actions".
A list of dicts with the keys "event_id", "room_id", "stream_ordering", "actions".
The list will be ordered by ascending stream_ordering.
The list will have between 0~limit entries.
"""
Expand Down Expand Up @@ -228,7 +230,7 @@ def get_after_receipt(txn):
txn.execute(sql, args)
return txn.fetchall()

after_read_receipt = yield self.db.runInteraction(
after_read_receipt = await self.db.runInteraction(
"get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
)

Expand Down Expand Up @@ -256,7 +258,7 @@ def get_no_receipt(txn):
txn.execute(sql, args)
return txn.fetchall()

no_read_receipt = yield self.db.runInteraction(
no_read_receipt = await self.db.runInteraction(
"get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
)

Expand All @@ -280,23 +282,25 @@ def get_no_receipt(txn):
# one of the subqueries may have hit the limit.
return notifs[:limit]

@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range_for_email(
self, user_id, min_stream_ordering, max_stream_ordering, limit=20
):
async def get_unread_push_actions_for_user_in_range_for_email(
self,
user_id: str,
min_stream_ordering: int,
max_stream_ordering: int,
limit: int = 20,
) -> List[dict]:
"""Get a list of the most recent unread push actions for a given user,
within the given stream ordering range. Called by the emailpusher
Args:
user_id (str): The user to fetch push actions for.
min_stream_ordering(int): The exclusive lower bound on the
user_id: The user to fetch push actions for.
min_stream_ordering: The exclusive lower bound on the
stream ordering of event push actions to fetch.
max_stream_ordering(int): The inclusive upper bound on the
max_stream_ordering: The inclusive upper bound on the
stream ordering of event push actions to fetch.
limit (int): The maximum number of rows to return.
limit: The maximum number of rows to return.
Returns:
A promise which resolves to a list of dicts with the keys "event_id",
"room_id", "stream_ordering", "actions", "received_ts".
A list of dicts with the keys "event_id", "room_id", "stream_ordering", "actions", "received_ts".
The list will be ordered by descending received_ts.
The list will have between 0~limit entries.
"""
Expand Down Expand Up @@ -328,7 +332,7 @@ def get_after_receipt(txn):
txn.execute(sql, args)
return txn.fetchall()

after_read_receipt = yield self.db.runInteraction(
after_read_receipt = await self.db.runInteraction(
"get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
)

Expand Down Expand Up @@ -356,7 +360,7 @@ def get_no_receipt(txn):
txn.execute(sql, args)
return txn.fetchall()

no_read_receipt = yield self.db.runInteraction(
no_read_receipt = await self.db.runInteraction(
"get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
)

Expand Down Expand Up @@ -461,17 +465,13 @@ def _add_push_actions_to_staging_txn(txn):
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)

@defer.inlineCallbacks
def remove_push_actions_from_staging(self, event_id):
async def remove_push_actions_from_staging(self, event_id: str) -> None:
"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB
Args:
event_id (str)
"""

try:
res = yield self.db.simple_delete(
res = await self.db.simple_delete(
table="event_push_actions_staging",
keyvalues={"event_id": event_id},
desc="remove_push_actions_from_staging",
Expand Down Expand Up @@ -606,8 +606,7 @@ def _find_first_stream_ordering_after_ts_txn(txn, ts):

return range_end

@defer.inlineCallbacks
def get_time_of_last_push_action_before(self, stream_ordering):
async def get_time_of_last_push_action_before(self, stream_ordering):
def f(txn):
sql = (
"SELECT e.received_ts"
Expand All @@ -620,7 +619,7 @@ def f(txn):
txn.execute(sql, (stream_ordering,))
return txn.fetchone()

result = yield self.db.runInteraction("get_time_of_last_push_action_before", f)
result = await self.db.runInteraction("get_time_of_last_push_action_before", f)
return result[0] if result else None


Expand Down Expand Up @@ -650,8 +649,7 @@ def __init__(self, database: Database, db_conn, hs):
self._start_rotate_notifs, 30 * 60 * 1000
)

@defer.inlineCallbacks
def get_push_actions_for_user(
async def get_push_actions_for_user(
self, user_id, before=None, limit=50, only_highlight=False
):
def f(txn):
Expand Down Expand Up @@ -682,18 +680,17 @@ def f(txn):
txn.execute(sql, args)
return self.db.cursor_to_dict(txn)

push_actions = yield self.db.runInteraction("get_push_actions_for_user", f)
push_actions = await self.db.runInteraction("get_push_actions_for_user", f)
for pa in push_actions:
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
return push_actions

@defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
async def get_latest_push_action_stream_ordering(self):
def f(txn):
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
return txn.fetchone()

result = yield self.db.runInteraction(
result = await self.db.runInteraction(
"get_latest_push_action_stream_ordering", f
)
return result[0] or 0
Expand Down Expand Up @@ -747,8 +744,7 @@ def _remove_old_push_actions_before_txn(
def _start_rotate_notifs(self):
return run_as_background_process("rotate_notifs", self._rotate_notifs)

@defer.inlineCallbacks
def _rotate_notifs(self):
async def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
self._doing_notif_rotation = True
Expand All @@ -757,12 +753,12 @@ def _rotate_notifs(self):
while True:
logger.info("Rotating notifications")

caught_up = yield self.db.runInteraction(
caught_up = await self.db.runInteraction(
"_rotate_notifs", self._rotate_notifs_txn
)
if caught_up:
break
yield self.hs.get_clock().sleep(self._rotate_delay)
await self.hs.get_clock().sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False

Expand Down
Loading

0 comments on commit c078f9e

Please sign in to comment.