Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert appservice to async. #7973

Merged
merged 11 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7973.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
31 changes: 13 additions & 18 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
import logging
import re

from twisted.internet import defer

from synapse.api.constants import EventTypes
from synapse.types import GroupID, get_domain_from_id
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.descriptors import cached

logger = logging.getLogger(__name__)

Expand All @@ -43,7 +41,7 @@ def send(self, as_api):
Args:
as_api(ApplicationServiceApi): The API to use to send.
Returns:
A Deferred which resolves to True if the transaction was sent.
An Awaitable which resolves to True if the transaction was sent.
"""
return as_api.push_bulk(
service=self.service, events=self.events, txn_id=self.id
Expand Down Expand Up @@ -172,8 +170,7 @@ def _is_exclusive(self, ns_key, test_string):
return regex_obj["exclusive"]
return False

@defer.inlineCallbacks
def _matches_user(self, event, store):
async def _matches_user(self, event, store):
if not event:
return False

Expand All @@ -188,12 +185,12 @@ def _matches_user(self, event, store):
if not store:
return False

does_match = yield self._matches_user_in_member_list(event.room_id, store)
does_match = await self._matches_user_in_member_list(event.room_id, store)
return does_match

@cachedInlineCallbacks(num_args=1, cache_context=True)
def _matches_user_in_member_list(self, room_id, store, cache_context):
member_list = yield store.get_users_in_room(
@cached(num_args=1, cache_context=True)
async def _matches_user_in_member_list(self, room_id, store, cache_context):
member_list = await store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)

Expand All @@ -208,35 +205,33 @@ def _matches_room_id(self, event):
return self.is_interested_in_room(event.room_id)
return False

@defer.inlineCallbacks
def _matches_aliases(self, event, store):
async def _matches_aliases(self, event, store):
if not store or not event:
return False

alias_list = yield store.get_aliases_for_room(event.room_id)
alias_list = await store.get_aliases_for_room(event.room_id)
for alias in alias_list:
if self.is_interested_in_alias(alias):
return True
return False

@defer.inlineCallbacks
def is_interested(self, event, store=None):
async def is_interested(self, event, store=None) -> bool:
"""Check if this service is interested in this event.

Args:
event(Event): The event to check.
store(DataStore)
Returns:
bool: True if this service would like to know about this event.
True if this service would like to know about this event.
"""
# Do cheap checks first
if self._matches_room_id(event):
return True

if (yield self._matches_aliases(event, store)):
if await self._matches_aliases(event, store):
return True

if (yield self._matches_user(event, store)):
if await self._matches_user(event, store):
return True

return False
Expand Down
21 changes: 8 additions & 13 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,12 @@ def __init__(self, hs):
hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS
)

@defer.inlineCallbacks
def query_user(self, service, user_id):
async def query_user(self, service, user_id):
if service.url is None:
return False
uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
try:
response = yield self.get_json(uri, {"access_token": service.hs_token})
response = await self.get_json(uri, {"access_token": service.hs_token})
if response is not None: # just an empty json object
return True
except CodeMessageException as e:
Expand All @@ -110,14 +109,12 @@ def query_user(self, service, user_id):
logger.warning("query_user to %s threw exception %s", uri, ex)
return False

@defer.inlineCallbacks
def query_alias(self, service, alias):
async def query_alias(self, service, alias):
if service.url is None:
return False
uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
response = None
try:
response = yield self.get_json(uri, {"access_token": service.hs_token})
response = await self.get_json(uri, {"access_token": service.hs_token})
if response is not None: # just an empty json object
return True
except CodeMessageException as e:
Expand All @@ -128,8 +125,7 @@ def query_alias(self, service, alias):
logger.warning("query_alias to %s threw exception %s", uri, ex)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the:

            if e.code == 404:
                return False

bit here is also useless.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that useless?

I'm unsure if you're asking me to make a change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I had noticed you removed the response = None line above as it wasn't necessary.

I'd say it's useless as if that if statement doesn't trigger, then the function will return False after the try statement anyways.

Though it may've been that returning True if code != 404 was the intention here.

return False

@defer.inlineCallbacks
def query_3pe(self, service, kind, protocol, fields):
async def query_3pe(self, service, kind, protocol, fields):
if kind == ThirdPartyEntityKind.USER:
required_field = "userid"
elif kind == ThirdPartyEntityKind.LOCATION:
Expand All @@ -146,7 +142,7 @@ def query_3pe(self, service, kind, protocol, fields):
urllib.parse.quote(protocol),
)
try:
response = yield self.get_json(uri, fields)
response = await self.get_json(uri, fields)
if not isinstance(response, list):
logger.warning(
"query_3pe to %s returned an invalid response %r", uri, response
Expand Down Expand Up @@ -202,8 +198,7 @@ def _get():
key = (service.id, protocol)
return self.protocol_meta_cache.wrap(key, _get)

@defer.inlineCallbacks
def push_bulk(self, service, events, txn_id=None):
async def push_bulk(self, service, events, txn_id=None):
clokep marked this conversation as resolved.
Show resolved Hide resolved
if service.url is None:
return True

Expand All @@ -218,7 +213,7 @@ def push_bulk(self, service, events, txn_id=None):

uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
try:
yield self.put_json(
await self.put_json(
uri=uri,
json_body={"events": events},
args={"access_token": service.hs_token},
Expand Down
49 changes: 20 additions & 29 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
"""
import logging

from twisted.internet import defer

from synapse.appservice import ApplicationServiceState
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
Expand All @@ -73,12 +71,11 @@ def __init__(self, hs):
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)

@defer.inlineCallbacks
def start(self):
async def start(self):
logger.info("Starting appservice scheduler")

# check for any DOWN ASes and start recoverers for them.
services = yield self.store.get_appservices_by_state(
services = await self.store.get_appservices_by_state(
ApplicationServiceState.DOWN
)

Expand Down Expand Up @@ -117,8 +114,7 @@ def enqueue(self, service, event):
"as-sender-%s" % (service.id,), self._send_request, service
)

@defer.inlineCallbacks
def _send_request(self, service):
async def _send_request(self, service):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
assert service.id not in self.requests_in_flight
Expand All @@ -130,7 +126,7 @@ def _send_request(self, service):
if not events:
return
try:
yield self.txn_ctrl.send(service, events)
await self.txn_ctrl.send(service, events)
except Exception:
logger.exception("AS request failed")
finally:
Expand Down Expand Up @@ -162,36 +158,33 @@ def __init__(self, clock, store, as_api):
# for UTs
self.RECOVERER_CLASS = _Recoverer

@defer.inlineCallbacks
def send(self, service, events):
async def send(self, service, events):
try:
txn = yield self.store.create_appservice_txn(service=service, events=events)
service_is_up = yield self._is_service_up(service)
txn = await self.store.create_appservice_txn(service=service, events=events)
service_is_up = await self._is_service_up(service)
if service_is_up:
sent = yield txn.send(self.as_api)
sent = await txn.send(self.as_api)
if sent:
yield txn.complete(self.store)
await txn.complete(self.store)
else:
run_in_background(self._on_txn_fail, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._on_txn_fail, service)

@defer.inlineCallbacks
def on_recovered(self, recoverer):
async def on_recovered(self, recoverer):
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
await self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)

@defer.inlineCallbacks
def _on_txn_fail(self, service):
async def _on_txn_fail(self, service):
try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
await self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
self.start_recoverer(service)
except Exception:
logger.exception("Error starting AS recoverer")
Expand All @@ -211,9 +204,8 @@ def start_recoverer(self, service):
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))

@defer.inlineCallbacks
def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service)
async def _is_service_up(self, service):
state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None


Expand Down Expand Up @@ -254,25 +246,24 @@ def _backoff(self):
self.backoff_counter += 1
self.recover()

@defer.inlineCallbacks
def retry(self):
async def retry(self):
logger.info("Starting retries on %s", self.service.id)
try:
while True:
txn = yield self.store.get_oldest_unsent_txn(self.service)
txn = await self.store.get_oldest_unsent_txn(self.service)
if not txn:
# nothing left: we're done!
self.callback(self)
await self.callback(self)
return

logger.info(
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
)
sent = yield txn.send(self.as_api)
sent = await txn.send(self.as_api)
if not sent:
break

yield txn.complete(self.store)
await txn.complete(self.store)

# reset the backoff counter and then process the next transaction
self.backoff_counter = 1
Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import log_failure
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -100,10 +99,11 @@ async def handle_event(event):

if not self.started_scheduler:

def start_scheduler():
return self.scheduler.start().addErrback(
log_failure, "Application Services Failure"
)
async def start_scheduler():
try:
return self.scheduler.start()
except Exception:
logger.error("Application Services Failure")

run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
Expand Down
Loading