Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert synapse.app to async/await. (#7868)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Jul 17, 2020
1 parent 6fca1b3 commit 00e57b7
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 41 deletions.
1 change: 1 addition & 0 deletions changelog.d/7868.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert synapse.app and federation client to async/await.
12 changes: 5 additions & 7 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from typing_extensions import ContextManager

from twisted.internet import address, defer, reactor
from twisted.internet import address, reactor

import synapse
import synapse.events
Expand Down Expand Up @@ -375,9 +375,8 @@ def _user_syncing():

return _user_syncing()

@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
parties = yield get_interested_parties(self.store, states)
async def notify_from_replication(self, states, stream_id):
parties = await get_interested_parties(self.store, states)
room_ids_to_states, users_to_states = parties

self.notifier.on_new_event(
Expand All @@ -387,8 +386,7 @@ def notify_from_replication(self, states, stream_id):
users=users_to_states.keys(),
)

@defer.inlineCallbacks
def process_replication_rows(self, token, rows):
async def process_replication_rows(self, token, rows):
states = [
UserPresenceState(
row.user_id,
Expand All @@ -406,7 +404,7 @@ def process_replication_rows(self, token, rows):
self.user_to_current_state[state.user_id] = state

stream_id = token
yield self.notify_from_replication(states, stream_id)
await self.notify_from_replication(states, stream_id)

def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
Expand Down
25 changes: 12 additions & 13 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,7 @@ def stopService(self):
_stats_process = []


@defer.inlineCallbacks
def phone_stats_home(hs, stats, stats_process=_stats_process):
async def phone_stats_home(hs, stats, stats_process=_stats_process):
logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
uptime = int(now - hs.start_time)
Expand Down Expand Up @@ -522,28 +521,28 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
)
stats["total_users"] = yield hs.get_datastore().count_all_users()
stats["total_users"] = await hs.get_datastore().count_all_users()

total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
total_nonbridged_users = await hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users

daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
daily_user_type_results = await hs.get_datastore().count_daily_user_type()
for name, count in daily_user_type_results.items():
stats["daily_user_type_" + name] = count

room_count = yield hs.get_datastore().get_room_count()
room_count = await hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count

stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
stats["daily_active_users"] = await hs.get_datastore().count_daily_users()
stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users()
stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = await hs.get_datastore().count_daily_messages()

r30_results = yield hs.get_datastore().count_r30_users()
r30_results = await hs.get_datastore().count_r30_users()
for name, count in r30_results.items():
stats["r30_users_" + name] = count

daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
daily_sent_messages = await hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = hs.config.caches.global_factor
stats["event_cache_size"] = hs.config.caches.event_cache_size
Expand All @@ -558,7 +557,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):

logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
try:
yield hs.get_proxied_http_client().put_json(
await hs.get_proxied_http_client().put_json(
hs.config.report_stats_endpoint, stats
)
except Exception as e:
Expand Down
40 changes: 19 additions & 21 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,29 +374,26 @@ async def _check_sigs_and_hash_and_fetch(
"""
deferreds = self._check_sigs_and_hashes(room_version, pdus)

@defer.inlineCallbacks
def handle_check_result(pdu: EventBase, deferred: Deferred):
async def handle_check_result(pdu: EventBase, deferred: Deferred):
try:
res = yield make_deferred_yieldable(deferred)
res = await make_deferred_yieldable(deferred)
except SynapseError:
res = None

if not res:
# Check local db.
res = yield self.store.get_event(
res = await self.store.get_event(
pdu.event_id, allow_rejected=True, allow_none=True
)

if not res and pdu.origin != origin:
try:
res = yield defer.ensureDeferred(
self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
room_version=room_version,
outlier=outlier,
timeout=10000,
)
res = await self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
room_version=room_version,
outlier=outlier,
timeout=10000,
)
except SynapseError:
pass
Expand Down Expand Up @@ -995,24 +992,25 @@ async def forward_third_party_invite(self, destinations, room_id, event_dict):

raise RuntimeError("Failed to send to any server.")

@defer.inlineCallbacks
def get_room_complexity(self, destination, room_id):
async def get_room_complexity(
self, destination: str, room_id: str
) -> Optional[dict]:
"""
Fetch the complexity of a remote room from another server.
Args:
destination (str): The remote server
room_id (str): The room ID to ask about.
destination: The remote server
room_id: The room ID to ask about.
Returns:
Deferred[dict] or Deferred[None]: Dict contains the complexity
metric versions, while None means we could not fetch the complexity.
Dict contains the complexity metric versions, while None means we
could not fetch the complexity.
"""
try:
complexity = yield self.transport_layer.get_room_complexity(
complexity = await self.transport_layer.get_room_complexity(
destination=destination, room_id=room_id
)
defer.returnValue(complexity)
return complexity
except CodeMessageException as e:
# We didn't manage to get it -- probably a 404. We are okay if other
# servers don't give it to us.
Expand All @@ -1029,4 +1027,4 @@ def get_room_complexity(self, destination, room_id):

# If we don't manage to find it, return None. It's not an error if a
# server doesn't give it to us.
defer.returnValue(None)
return None

0 comments on commit 00e57b7

Please sign in to comment.