Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix logcontexts for running pushers #3710

Merged
merged 2 commits into from
Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3710.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix "Starting db txn 'get_all_updated_receipts' from sentinel context" warning
4 changes: 2 additions & 2 deletions synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ def poke_pushers(self, stream_name, token, rows):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
yield self.pusher_pool.on_new_notifications(
self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
yield self.pusher_pool.on_new_receipts(
self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
Expand Down
3 changes: 1 addition & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2386,8 +2386,7 @@ def _notify_persisted_event(self, event, max_stream_id):
extra_users=extra_users
)

logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)

Expand Down
7 changes: 2 additions & 5 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,11 +774,8 @@ def is_inviter_member_event(e):
event, context=context
)

# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)

def _notify():
Expand Down
18 changes: 8 additions & 10 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from synapse.types import get_domain_from_id
from synapse.util import logcontext
from synapse.util.logcontext import PreserveLoggingContext

from ._base import BaseHandler

Expand Down Expand Up @@ -116,16 +115,15 @@ def _handle_new_receipts(self, receipts):

affected_room_ids = list(set([r["room_id"] for r in receipts]))

with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
)
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)

defer.returnValue(True)
defer.returnValue(True)

@logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks
Expand Down
17 changes: 15 additions & 2 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, run_in_background

Expand Down Expand Up @@ -122,8 +123,14 @@ def remove_pushers_by_access_token(self, user_id, access_tokens):
p['app_id'], p['pushkey'], p['user_name'],
)

@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
run_as_background_process(
"on_new_notifications",
self._on_new_notifications, min_stream_id, max_stream_id,
)

@defer.inlineCallbacks
def _on_new_notifications(self, min_stream_id, max_stream_id):
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
Expand All @@ -147,8 +154,14 @@ def on_new_notifications(self, min_stream_id, max_stream_id):
except Exception:
logger.exception("Exception in pusher on_new_notifications")

@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
run_as_background_process(
"on_new_receipts",
self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
)

@defer.inlineCallbacks
def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
Expand Down