Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert replication code to async/await. (#7987)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Aug 3, 2020
1 parent db5970a commit 3b415e2
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 38 deletions.
1 change: 1 addition & 0 deletions changelog.d/7987.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
2 changes: 1 addition & 1 deletion synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def register_with_store(
address (str|None): the IP address used to perform the registration.
Returns:
Deferred
Awaitable
"""
if self.hs.config.worker_app:
return self._register_client(
Expand Down
18 changes: 7 additions & 11 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from inspect import signature
from typing import Dict, List, Tuple

from twisted.internet import defer

from synapse.api.errors import (
CodeMessageException,
HttpResponseException,
Expand Down Expand Up @@ -101,7 +99,7 @@ def __init__(self, hs):
assert self.METHOD in ("PUT", "POST", "GET")

@abc.abstractmethod
def _serialize_payload(**kwargs):
async def _serialize_payload(**kwargs):
"""Static method that is called when creating a request.
Concrete implementations should have explicit parameters (rather than
Expand All @@ -110,9 +108,8 @@ def _serialize_payload(**kwargs):
argument list.
Returns:
Deferred[dict]|dict: If POST/PUT request then dictionary must be
JSON serialisable, otherwise must be appropriate for adding as
query args.
dict: If POST/PUT request then dictionary must be JSON serialisable,
otherwise must be appropriate for adding as query args.
"""
return {}

Expand Down Expand Up @@ -144,8 +141,7 @@ def make_client(cls, hs):
instance_map = hs.config.worker.instance_map

@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(instance_name="master", **kwargs):
async def send_request(instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
Expand All @@ -159,7 +155,7 @@ def send_request(instance_name="master", **kwargs):
"Instance %r not in 'instance_map' config" % (instance_name,)
)

data = yield cls._serialize_payload(**kwargs)
data = await cls._serialize_payload(**kwargs)

url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
Expand Down Expand Up @@ -197,7 +193,7 @@ def send_request(instance_name="master", **kwargs):
headers = {} # type: Dict[bytes, List[bytes]]
inject_active_span_byte_dict(headers, None, check_destination=False)
try:
result = yield request_func(uri, data, headers=headers)
result = await request_func(uri, data, headers=headers)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
Expand All @@ -207,7 +203,7 @@ def send_request(instance_name="master", **kwargs):

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield clock.sleep(1)
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/http/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()

@staticmethod
def _serialize_payload(user_id):
async def _serialize_payload(user_id):
return {}

async def _handle_request(self, request, user_id):
Expand Down
17 changes: 6 additions & 11 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
Expand Down Expand Up @@ -67,8 +65,7 @@ def __init__(self, hs):
self.federation_handler = hs.get_handlers().federation_handler

@staticmethod
@defer.inlineCallbacks
def _serialize_payload(store, event_and_contexts, backfilled):
async def _serialize_payload(store, event_and_contexts, backfilled):
"""
Args:
store
Expand All @@ -78,9 +75,7 @@ def _serialize_payload(store, event_and_contexts, backfilled):
"""
event_payloads = []
for event, context in event_and_contexts:
serialized_context = yield defer.ensureDeferred(
context.serialize(event, store)
)
serialized_context = await context.serialize(event, store)

event_payloads.append(
{
Expand Down Expand Up @@ -156,7 +151,7 @@ def __init__(self, hs):
self.registry = hs.get_federation_registry()

@staticmethod
def _serialize_payload(edu_type, origin, content):
async def _serialize_payload(edu_type, origin, content):
return {"origin": origin, "content": content}

async def _handle_request(self, request, edu_type):
Expand Down Expand Up @@ -199,7 +194,7 @@ def __init__(self, hs):
self.registry = hs.get_federation_registry()

@staticmethod
def _serialize_payload(query_type, args):
async def _serialize_payload(query_type, args):
"""
Args:
query_type (str)
Expand Down Expand Up @@ -240,7 +235,7 @@ def __init__(self, hs):
self.store = hs.get_datastore()

@staticmethod
def _serialize_payload(room_id, args):
async def _serialize_payload(room_id, args):
"""
Args:
room_id (str)
Expand Down Expand Up @@ -275,7 +270,7 @@ def __init__(self, hs):
self.store = hs.get_datastore()

@staticmethod
def _serialize_payload(room_id, room_version):
async def _serialize_payload(room_id, room_version):
return {"room_version": room_version.identifier}

async def _handle_request(self, request, room_id):
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/http/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, hs):
self.registration_handler = hs.get_registration_handler()

@staticmethod
def _serialize_payload(user_id, device_id, initial_display_name, is_guest):
async def _serialize_payload(user_id, device_id, initial_display_name, is_guest):
"""
Args:
device_id (str|None): Device ID to use, if None a new one is
Expand Down
8 changes: 5 additions & 3 deletions synapse/replication/http/membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def __init__(self, hs):
self.clock = hs.get_clock()

@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
async def _serialize_payload(
requester, room_id, user_id, remote_room_hosts, content
):
"""
Args:
requester(Requester)
Expand Down Expand Up @@ -112,7 +114,7 @@ def __init__(self, hs: "HomeServer"):
self.member_handler = hs.get_room_member_handler()

@staticmethod
def _serialize_payload( # type: ignore
async def _serialize_payload( # type: ignore
invite_event_id: str,
txn_id: Optional[str],
requester: Requester,
Expand Down Expand Up @@ -174,7 +176,7 @@ def __init__(self, hs):
self.distributor = hs.get_distributor()

@staticmethod
def _serialize_payload(room_id, user_id, change):
async def _serialize_payload(room_id, user_id, change):
"""
Args:
room_id (str)
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/http/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, hs: "HomeServer"):
self._presence_handler = hs.get_presence_handler()

@staticmethod
def _serialize_payload(user_id):
async def _serialize_payload(user_id):
return {}

async def _handle_request(self, request, user_id):
Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(self, hs: "HomeServer"):
self._presence_handler = hs.get_presence_handler()

@staticmethod
def _serialize_payload(user_id, state, ignore_status_msg=False):
async def _serialize_payload(user_id, state, ignore_status_msg=False):
return {
"state": state,
"ignore_status_msg": ignore_status_msg,
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/http/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, hs):
self.registration_handler = hs.get_registration_handler()

@staticmethod
def _serialize_payload(
async def _serialize_payload(
user_id,
password_hash,
was_guest,
Expand Down Expand Up @@ -105,7 +105,7 @@ def __init__(self, hs):
self.registration_handler = hs.get_registration_handler()

@staticmethod
def _serialize_payload(user_id, auth_result, access_token):
async def _serialize_payload(user_id, auth_result, access_token):
"""
Args:
user_id (str): The user ID that consented
Expand Down
7 changes: 2 additions & 5 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
Expand Down Expand Up @@ -62,8 +60,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()

@staticmethod
@defer.inlineCallbacks
def _serialize_payload(
async def _serialize_payload(
event_id, store, event, context, requester, ratelimit, extra_users
):
"""
Expand All @@ -77,7 +74,7 @@ def _serialize_payload(
extra_users (list(UserID)): Any extra users to notify about event
"""

serialized_context = yield defer.ensureDeferred(context.serialize(event, store))
serialized_context = await context.serialize(event, store)

payload = {
"event": event.get_pdu_json(),
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/http/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, hs):
self.streams = hs.get_replication_streams()

@staticmethod
def _serialize_payload(stream_name, from_token, upto_token):
async def _serialize_payload(stream_name, from_token, upto_token):
return {"from_token": from_token, "upto_token": upto_token}

async def _handle_request(self, request, stream_name):
Expand Down

0 comments on commit 3b415e2

Please sign in to comment.