Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove some run_in_background calls in replication code #7203

Merged
merged 1 commit into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7203.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix some worker-mode replication handling not being correctly recorded in CPU usage stats.
16 changes: 8 additions & 8 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
Expand Down Expand Up @@ -635,7 +635,7 @@ async def on_rdata(self, stream_name, token, rows):
await super(GenericWorkerReplicationHandler, self).on_rdata(
stream_name, token, rows
)
run_in_background(self.process_and_notify, stream_name, token, rows)
await self.process_and_notify(stream_name, token, rows)

def get_streams_to_replicate(self):
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
Expand All @@ -650,7 +650,9 @@ def get_currently_syncing_users(self):
async def process_and_notify(self, stream_name, token, rows):
try:
if self.send_handler:
self.send_handler.process_replication_rows(stream_name, token, rows)
await self.send_handler.process_replication_rows(
stream_name, token, rows
)

if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
Expand Down Expand Up @@ -782,22 +784,20 @@ def wake_destination(self, server: str):
def stream_positions(self):
return {"federation": self.federation_position}

def process_replication_rows(self, stream_name, token, rows):
async def process_replication_rows(self, stream_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
run_in_background(self.update_token, token)
await self.update_token(token)

# We also need to poke the federation sender when new events happen
elif stream_name == "events":
self.federation_sender.notify_new_events(token)

# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
run_as_background_process(
"process_receipts_for_federation", self._on_new_receipts, rows
)
await self._on_new_receipts(rows)

# ... as well as device updates and messages
elif stream_name == DeviceListsStream.NAME:
Expand Down