Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert appservice to async. (#7973)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Jul 30, 2020
1 parent b3a97d6 commit 4cce8ef
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 103 deletions.
1 change: 1 addition & 0 deletions changelog.d/7973.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
31 changes: 13 additions & 18 deletions synapse/appservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
import logging
import re

from twisted.internet import defer

from synapse.api.constants import EventTypes
from synapse.types import GroupID, get_domain_from_id
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.descriptors import cached

logger = logging.getLogger(__name__)

Expand All @@ -43,7 +41,7 @@ def send(self, as_api):
Args:
as_api(ApplicationServiceApi): The API to use to send.
Returns:
A Deferred which resolves to True if the transaction was sent.
An Awaitable which resolves to True if the transaction was sent.
"""
return as_api.push_bulk(
service=self.service, events=self.events, txn_id=self.id
Expand Down Expand Up @@ -172,8 +170,7 @@ def _is_exclusive(self, ns_key, test_string):
return regex_obj["exclusive"]
return False

@defer.inlineCallbacks
def _matches_user(self, event, store):
async def _matches_user(self, event, store):
if not event:
return False

Expand All @@ -188,12 +185,12 @@ def _matches_user(self, event, store):
if not store:
return False

does_match = yield self._matches_user_in_member_list(event.room_id, store)
does_match = await self._matches_user_in_member_list(event.room_id, store)
return does_match

@cachedInlineCallbacks(num_args=1, cache_context=True)
def _matches_user_in_member_list(self, room_id, store, cache_context):
member_list = yield store.get_users_in_room(
@cached(num_args=1, cache_context=True)
async def _matches_user_in_member_list(self, room_id, store, cache_context):
member_list = await store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)

Expand All @@ -208,35 +205,33 @@ def _matches_room_id(self, event):
return self.is_interested_in_room(event.room_id)
return False

@defer.inlineCallbacks
def _matches_aliases(self, event, store):
async def _matches_aliases(self, event, store):
if not store or not event:
return False

alias_list = yield store.get_aliases_for_room(event.room_id)
alias_list = await store.get_aliases_for_room(event.room_id)
for alias in alias_list:
if self.is_interested_in_alias(alias):
return True
return False

@defer.inlineCallbacks
def is_interested(self, event, store=None):
async def is_interested(self, event, store=None) -> bool:
"""Check if this service is interested in this event.
Args:
event(Event): The event to check.
store(DataStore)
Returns:
bool: True if this service would like to know about this event.
True if this service would like to know about this event.
"""
# Do cheap checks first
if self._matches_room_id(event):
return True

if (yield self._matches_aliases(event, store)):
if await self._matches_aliases(event, store):
return True

if (yield self._matches_user(event, store)):
if await self._matches_user(event, store):
return True

return False
Expand Down
21 changes: 8 additions & 13 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,12 @@ def __init__(self, hs):
hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS
)

@defer.inlineCallbacks
def query_user(self, service, user_id):
async def query_user(self, service, user_id):
if service.url is None:
return False
uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
try:
response = yield self.get_json(uri, {"access_token": service.hs_token})
response = await self.get_json(uri, {"access_token": service.hs_token})
if response is not None: # just an empty json object
return True
except CodeMessageException as e:
Expand All @@ -110,14 +109,12 @@ def query_user(self, service, user_id):
logger.warning("query_user to %s threw exception %s", uri, ex)
return False

@defer.inlineCallbacks
def query_alias(self, service, alias):
async def query_alias(self, service, alias):
if service.url is None:
return False
uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
response = None
try:
response = yield self.get_json(uri, {"access_token": service.hs_token})
response = await self.get_json(uri, {"access_token": service.hs_token})
if response is not None: # just an empty json object
return True
except CodeMessageException as e:
Expand All @@ -128,8 +125,7 @@ def query_alias(self, service, alias):
logger.warning("query_alias to %s threw exception %s", uri, ex)
return False

@defer.inlineCallbacks
def query_3pe(self, service, kind, protocol, fields):
async def query_3pe(self, service, kind, protocol, fields):
if kind == ThirdPartyEntityKind.USER:
required_field = "userid"
elif kind == ThirdPartyEntityKind.LOCATION:
Expand All @@ -146,7 +142,7 @@ def query_3pe(self, service, kind, protocol, fields):
urllib.parse.quote(protocol),
)
try:
response = yield self.get_json(uri, fields)
response = await self.get_json(uri, fields)
if not isinstance(response, list):
logger.warning(
"query_3pe to %s returned an invalid response %r", uri, response
Expand Down Expand Up @@ -202,8 +198,7 @@ def _get():
key = (service.id, protocol)
return self.protocol_meta_cache.wrap(key, _get)

@defer.inlineCallbacks
def push_bulk(self, service, events, txn_id=None):
async def push_bulk(self, service, events, txn_id=None):
if service.url is None:
return True

Expand All @@ -218,7 +213,7 @@ def push_bulk(self, service, events, txn_id=None):

uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
try:
yield self.put_json(
await self.put_json(
uri=uri,
json_body={"events": events},
args={"access_token": service.hs_token},
Expand Down
49 changes: 20 additions & 29 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
"""
import logging

from twisted.internet import defer

from synapse.appservice import ApplicationServiceState
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
Expand All @@ -73,12 +71,11 @@ def __init__(self, hs):
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)

@defer.inlineCallbacks
def start(self):
async def start(self):
logger.info("Starting appservice scheduler")

# check for any DOWN ASes and start recoverers for them.
services = yield self.store.get_appservices_by_state(
services = await self.store.get_appservices_by_state(
ApplicationServiceState.DOWN
)

Expand Down Expand Up @@ -117,8 +114,7 @@ def enqueue(self, service, event):
"as-sender-%s" % (service.id,), self._send_request, service
)

@defer.inlineCallbacks
def _send_request(self, service):
async def _send_request(self, service):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
assert service.id not in self.requests_in_flight
Expand All @@ -130,7 +126,7 @@ def _send_request(self, service):
if not events:
return
try:
yield self.txn_ctrl.send(service, events)
await self.txn_ctrl.send(service, events)
except Exception:
logger.exception("AS request failed")
finally:
Expand Down Expand Up @@ -162,36 +158,33 @@ def __init__(self, clock, store, as_api):
# for UTs
self.RECOVERER_CLASS = _Recoverer

@defer.inlineCallbacks
def send(self, service, events):
async def send(self, service, events):
try:
txn = yield self.store.create_appservice_txn(service=service, events=events)
service_is_up = yield self._is_service_up(service)
txn = await self.store.create_appservice_txn(service=service, events=events)
service_is_up = await self._is_service_up(service)
if service_is_up:
sent = yield txn.send(self.as_api)
sent = await txn.send(self.as_api)
if sent:
yield txn.complete(self.store)
await txn.complete(self.store)
else:
run_in_background(self._on_txn_fail, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._on_txn_fail, service)

@defer.inlineCallbacks
def on_recovered(self, recoverer):
async def on_recovered(self, recoverer):
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
await self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)

@defer.inlineCallbacks
def _on_txn_fail(self, service):
async def _on_txn_fail(self, service):
try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
await self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
self.start_recoverer(service)
except Exception:
logger.exception("Error starting AS recoverer")
Expand All @@ -211,9 +204,8 @@ def start_recoverer(self, service):
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))

@defer.inlineCallbacks
def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service)
async def _is_service_up(self, service):
state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None


Expand Down Expand Up @@ -254,25 +246,24 @@ def _backoff(self):
self.backoff_counter += 1
self.recover()

@defer.inlineCallbacks
def retry(self):
async def retry(self):
logger.info("Starting retries on %s", self.service.id)
try:
while True:
txn = yield self.store.get_oldest_unsent_txn(self.service)
txn = await self.store.get_oldest_unsent_txn(self.service)
if not txn:
# nothing left: we're done!
self.callback(self)
await self.callback(self)
return

logger.info(
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
)
sent = yield txn.send(self.as_api)
sent = await txn.send(self.as_api)
if not sent:
break

yield txn.complete(self.store)
await txn.complete(self.store)

# reset the backoff counter and then process the next transaction
self.backoff_counter = 1
Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import log_failure
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -100,10 +99,11 @@ async def handle_event(event):

if not self.started_scheduler:

def start_scheduler():
return self.scheduler.start().addErrback(
log_failure, "Application Services Failure"
)
async def start_scheduler():
try:
return self.scheduler.start()
except Exception:
logger.error("Application Services Failure")

run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
Expand Down
Loading

0 comments on commit 4cce8ef

Please sign in to comment.