Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert streams to async. #8014

Merged
merged 3 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8014.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
4 changes: 2 additions & 2 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async def _snapshot_all_rooms(

rooms_ret = []

now_token = await self.hs.get_event_sources().get_current_token()
now_token = self.hs.get_event_sources().get_current_token()

presence_stream = self.hs.get_event_sources().sources["presence"]
pagination_config = PaginationConfig(from_token=now_token)
Expand Down Expand Up @@ -360,7 +360,7 @@ async def _room_initial_sync_joined(
current_state.values(), time_now
)

now_token = await self.hs.get_event_sources().get_current_token()
now_token = self.hs.get_event_sources().get_current_token()

limit = pagin_config.limit if pagin_config else None
if limit is None:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ async def get_messages(
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
await self.hs.get_event_sources().get_current_token_for_pagination()
self.hs.get_event_sources().get_current_token_for_pagination()
)
room_token = pagin_config.from_token.room_key

Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import math
import string
from collections import OrderedDict
from typing import Optional, Tuple
from typing import Awaitable, Optional, Tuple

from synapse.api.constants import (
EventTypes,
Expand Down Expand Up @@ -1041,7 +1041,7 @@ async def get_new_events(
):
# We just ignore the key for now.

to_key = await self.get_current_key()
to_key = self.get_current_key()

from_token = RoomStreamToken.parse(from_key)
if from_token.topological:
Expand Down Expand Up @@ -1081,10 +1081,10 @@ async def get_new_events(

return (events, end_key)

def get_current_key(self):
return self.store.get_room_events_max_id()
def get_current_key(self) -> str:
return "s%d" % (self.store.get_room_max_stream_ordering(),)

def get_current_key_for_room(self, room_id):
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)


Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ async def search(self, user, content, batch=None):
# If client has asked for "context" for each event (i.e. some surrounding
# events and state), fetch that
if event_context is not None:
now_token = await self.hs.get_event_sources().get_current_token()
now_token = self.hs.get_event_sources().get_current_token()

contexts = {}
for event in allowed_events:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ async def generate_sync_result(
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = await self.event_sources.get_current_token()
now_token = self.event_sources.get_current_token()

logger.debug(
"Calculating sync response for %r between %s and %s",
Expand Down
4 changes: 2 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async def wait_for_events(
"""
user_stream = self.user_to_user_stream.get(user_id)
if user_stream is None:
current_token = await self.event_sources.get_current_token()
current_token = self.event_sources.get_current_token()
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
Expand Down Expand Up @@ -397,7 +397,7 @@ async def get_events_for(
"""
from_token = pagination_config.from_token
if not from_token:
from_token = await self.event_sources.get_current_token()
from_token = self.event_sources.get_current_token()

limit = pagination_config.limit

Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/data_stores/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import abc
import logging
from collections import namedtuple
from typing import Optional

from twisted.internet import defer

Expand Down Expand Up @@ -557,19 +558,18 @@ def _f(txn):

return self.db.runInteraction("get_room_event_before_stream_ordering", _f)

@defer.inlineCallbacks
def get_room_events_max_id(self, room_id=None):
async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
"""Returns the current token for rooms stream.

By default, it returns the current global stream token. Specifying a
`room_id` causes it to return the current room specific topological
token.
"""
token = yield self.get_room_max_stream_ordering()
token = self.get_room_max_stream_ordering()
if room_id is None:
return "s%d" % (token,)
else:
topo = yield self.db.runInteraction(
topo = await self.db.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id
)
return "t%d-%d" % (topo, token)
Expand Down
22 changes: 9 additions & 13 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

from typing import Any, Dict

from twisted.internet import defer

from synapse.handlers.account_data import AccountDataEventSource
from synapse.handlers.presence import PresenceEventSource
from synapse.handlers.receipts import ReceiptEventSource
Expand All @@ -40,39 +38,37 @@ def __init__(self, hs):
} # type: Dict[str, Any]
self.store = hs.get_datastore()

@defer.inlineCallbacks
def get_current_token(self):
def get_current_token(self) -> StreamToken:
push_rules_key, _ = self.store.get_push_rules_stream_token()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
groups_key = self.store.get_group_stream_token()

token = StreamToken(
room_key=(yield self.sources["room"].get_current_key()),
presence_key=(yield self.sources["presence"].get_current_key()),
typing_key=(yield self.sources["typing"].get_current_key()),
receipt_key=(yield self.sources["receipt"].get_current_key()),
account_data_key=(yield self.sources["account_data"].get_current_key()),
room_key=self.sources["room"].get_current_key(),
presence_key=self.sources["presence"].get_current_key(),
typing_key=self.sources["typing"].get_current_key(),
receipt_key=self.sources["receipt"].get_current_key(),
account_data_key=self.sources["account_data"].get_current_key(),
push_rules_key=push_rules_key,
to_device_key=to_device_key,
device_list_key=device_list_key,
groups_key=groups_key,
)
return token

@defer.inlineCallbacks
def get_current_token_for_pagination(self):
def get_current_token_for_pagination(self) -> StreamToken:
"""Get the current token for a given room to be used to paginate
events.

The returned token does not have the current values for fields other
than `room`, since they are not used during pagination.

Returns:
Deferred[StreamToken]
The current token for pagination.
"""
token = StreamToken(
room_key=(yield self.sources["room"].get_current_key()),
room_key=self.sources["room"].get_current_key(),
presence_key=0,
typing_key=0,
receipt_key=0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def test_server_notice_only_sent_once(self):
self.server_notices_manager.get_or_create_notice_room_for_user(self.user_id)
)

token = self.get_success(self.event_source.get_current_token())
token = self.event_source.get_current_token()
events, _ = self.get_success(
self.store.get_recent_events_for_room(
room_id, limit=100, end_token=token.room_key
Expand Down