Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6496 from matrix-org/erikj/initial_sync_asnyc
Browse files Browse the repository at this point in the history
* commit 'caa52836e':
  Newsfile
  Port synapse.handlers.initial_sync to async/await
  • Loading branch information
anoadragon453 committed Mar 19, 2020
2 parents e5592ca + caa5283 commit 3e5e7f2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 52 deletions.
1 change: 1 addition & 0 deletions changelog.d/6496.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Port synapse.handlers.initial_sync to async/await.
96 changes: 44 additions & 52 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ def snapshot_all_rooms(
include_archived,
)

@defer.inlineCallbacks
def _snapshot_all_rooms(
async def _snapshot_all_rooms(
self,
user_id=None,
pagin_config=None,
Expand All @@ -102,41 +101,40 @@ def _snapshot_all_rooms(
if include_archived:
memberships.append(Membership.LEAVE)

room_list = yield self.store.get_rooms_for_user_where_membership_is(
room_list = await self.store.get_rooms_for_user_where_membership_is(
user_id=user_id, membership_list=memberships
)

user = UserID.from_string(user_id)

rooms_ret = []

now_token = yield self.hs.get_event_sources().get_current_token()
now_token = await self.hs.get_event_sources().get_current_token()

presence_stream = self.hs.get_event_sources().sources["presence"]
pagination_config = PaginationConfig(from_token=now_token)
presence, _ = yield presence_stream.get_pagination_rows(
presence, _ = await presence_stream.get_pagination_rows(
user, pagination_config.get_source_config("presence"), None
)

receipt_stream = self.hs.get_event_sources().sources["receipt"]
receipt, _ = yield receipt_stream.get_pagination_rows(
receipt, _ = await receipt_stream.get_pagination_rows(
user, pagination_config.get_source_config("receipt"), None
)

tags_by_room = yield self.store.get_tags_for_user(user_id)
tags_by_room = await self.store.get_tags_for_user(user_id)

account_data, account_data_by_room = yield self.store.get_account_data_for_user(
account_data, account_data_by_room = await self.store.get_account_data_for_user(
user_id
)

public_room_ids = yield self.store.get_public_room_ids()
public_room_ids = await self.store.get_public_room_ids()

limit = pagin_config.limit
if limit is None:
limit = 10

@defer.inlineCallbacks
def handle_room(event):
async def handle_room(event):
d = {
"room_id": event.room_id,
"membership": event.membership,
Expand All @@ -149,8 +147,8 @@ def handle_room(event):
time_now = self.clock.time_msec()
d["inviter"] = event.sender

invite_event = yield self.store.get_event(event.event_id)
d["invite"] = yield self._event_serializer.serialize_event(
invite_event = await self.store.get_event(event.event_id)
d["invite"] = await self._event_serializer.serialize_event(
invite_event, time_now, as_client_event
)

Expand All @@ -174,7 +172,7 @@ def handle_room(event):
lambda states: states[event.event_id]
)

(messages, token), current_state = yield make_deferred_yieldable(
(messages, token), current_state = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
Expand All @@ -188,7 +186,7 @@ def handle_room(event):
)
).addErrback(unwrapFirstError)

messages = yield filter_events_for_client(
messages = await filter_events_for_client(
self.storage, user_id, messages
)

Expand All @@ -198,15 +196,15 @@ def handle_room(event):

d["messages"] = {
"chunk": (
yield self._event_serializer.serialize_events(
await self._event_serializer.serialize_events(
messages, time_now=time_now, as_client_event=as_client_event
)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
}

d["state"] = yield self._event_serializer.serialize_events(
d["state"] = await self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
as_client_event=as_client_event,
Expand All @@ -229,7 +227,7 @@ def handle_room(event):
except Exception:
logger.exception("Failed to get snapshot")

yield concurrently_execute(handle_room, room_list, 10)
await concurrently_execute(handle_room, room_list, 10)

account_data_events = []
for account_data_type, content in account_data.items():
Expand All @@ -253,8 +251,7 @@ def handle_room(event):

return ret

@defer.inlineCallbacks
def room_initial_sync(self, requester, room_id, pagin_config=None):
async def room_initial_sync(self, requester, room_id, pagin_config=None):
"""Capture the a snapshot of a room. If user is currently a member of
the room this will be what is currently in the room. If the user left
the room this will be what was in the room when they left.
Expand All @@ -271,58 +268,57 @@ def room_initial_sync(self, requester, room_id, pagin_config=None):
A JSON serialisable dict with the snapshot of the room.
"""

blocked = yield self.store.is_room_blocked(room_id)
blocked = await self.store.is_room_blocked(room_id)
if blocked:
raise SynapseError(403, "This room has been blocked on this server")

user_id = requester.user.to_string()

membership, member_event_id = yield self._check_in_room_or_world_readable(
membership, member_event_id = await self._check_in_room_or_world_readable(
room_id, user_id
)
is_peeking = member_event_id is None

if membership == Membership.JOIN:
result = yield self._room_initial_sync_joined(
result = await self._room_initial_sync_joined(
user_id, room_id, pagin_config, membership, is_peeking
)
elif membership == Membership.LEAVE:
result = yield self._room_initial_sync_parted(
result = await self._room_initial_sync_parted(
user_id, room_id, pagin_config, membership, member_event_id, is_peeking
)

account_data_events = []
tags = yield self.store.get_tags_for_room(user_id, room_id)
tags = await self.store.get_tags_for_room(user_id, room_id)
if tags:
account_data_events.append({"type": "m.tag", "content": {"tags": tags}})

account_data = yield self.store.get_account_data_for_room(user_id, room_id)
account_data = await self.store.get_account_data_for_room(user_id, room_id)
for account_data_type, content in account_data.items():
account_data_events.append({"type": account_data_type, "content": content})

result["account_data"] = account_data_events

return result

@defer.inlineCallbacks
def _room_initial_sync_parted(
async def _room_initial_sync_parted(
self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking
):
room_state = yield self.state_store.get_state_for_events([member_event_id])
room_state = await self.state_store.get_state_for_events([member_event_id])

room_state = room_state[member_event_id]

limit = pagin_config.limit if pagin_config else None
if limit is None:
limit = 10

stream_token = yield self.store.get_stream_token_for_event(member_event_id)
stream_token = await self.store.get_stream_token_for_event(member_event_id)

messages, token = yield self.store.get_recent_events_for_room(
messages, token = await self.store.get_recent_events_for_room(
room_id, limit=limit, end_token=stream_token
)

messages = yield filter_events_for_client(
messages = await filter_events_for_client(
self.storage, user_id, messages, is_peeking=is_peeking
)

Expand All @@ -336,33 +332,32 @@ def _room_initial_sync_parted(
"room_id": room_id,
"messages": {
"chunk": (
yield self._event_serializer.serialize_events(messages, time_now)
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
"state": (
yield self._event_serializer.serialize_events(
await self._event_serializer.serialize_events(
room_state.values(), time_now
)
),
"presence": [],
"receipts": [],
}

@defer.inlineCallbacks
def _room_initial_sync_joined(
async def _room_initial_sync_joined(
self, user_id, room_id, pagin_config, membership, is_peeking
):
current_state = yield self.state.get_current_state(room_id=room_id)
current_state = await self.state.get_current_state(room_id=room_id)

# TODO: These concurrently
time_now = self.clock.time_msec()
state = yield self._event_serializer.serialize_events(
state = await self._event_serializer.serialize_events(
current_state.values(), time_now
)

now_token = yield self.hs.get_event_sources().get_current_token()
now_token = await self.hs.get_event_sources().get_current_token()

limit = pagin_config.limit if pagin_config else None
if limit is None:
Expand All @@ -377,28 +372,26 @@ def _room_initial_sync_joined(

presence_handler = self.hs.get_presence_handler()

@defer.inlineCallbacks
def get_presence():
async def get_presence():
# If presence is disabled, return an empty list
if not self.hs.config.use_presence:
return []

states = yield presence_handler.get_states(
states = await presence_handler.get_states(
[m.user_id for m in room_members], as_event=True
)

return states

@defer.inlineCallbacks
def get_receipts():
receipts = yield self.store.get_linearized_receipts_for_room(
async def get_receipts():
receipts = await self.store.get_linearized_receipts_for_room(
room_id, to_key=now_token.receipt_key
)
if not receipts:
receipts = []
return receipts

presence, receipts, (messages, token) = yield make_deferred_yieldable(
presence, receipts, (messages, token) = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(get_presence),
Expand All @@ -414,7 +407,7 @@ def get_receipts():
).addErrback(unwrapFirstError)
)

messages = yield filter_events_for_client(
messages = await filter_events_for_client(
self.storage, user_id, messages, is_peeking=is_peeking
)

Expand All @@ -427,7 +420,7 @@ def get_receipts():
"room_id": room_id,
"messages": {
"chunk": (
yield self._event_serializer.serialize_events(messages, time_now)
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
Expand All @@ -441,18 +434,17 @@ def get_receipts():

return ret

@defer.inlineCallbacks
def _check_in_room_or_world_readable(self, room_id, user_id):
async def _check_in_room_or_world_readable(self, room_id, user_id):
try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
member_event = await self.auth.check_user_was_in_room(room_id, user_id)
return member_event.membership, member_event.event_id
except AuthError:
visibility = yield self.state_handler.get_current_state(
visibility = await self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
Expand Down

0 comments on commit 3e5e7f2

Please sign in to comment.