Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

fix push notifications for invites from remote servers #11410

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11410.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix not sending out push notifications for invites from remote servers.
2 changes: 2 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,8 @@ async def on_invite_request(
event.room_id, [(event, context)]
)

await self._federation_event_handler.notify_remote_invite(event, context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notes: So the reason this issue happens is that persist_events_and_notify above is being used instead of _run_push_actions_and_persist_event (or on_send_membership_event which calls it) which both persists the event and sets up some push actions in the staging area^1.

The reason it's done this way seems to be that _run_push_actions_and_persist_event requires that the event is not an outlier (event with unknown state), whereas these invites are outliers. (I'm not sure what reason this was done for: perhaps push rules need to know some state in the room, since push rules let you check things like room size etc?)

^1: (not sure what these actually are — looks like a way to prepare the results of push actions and the code that persists the events will move them over. Wonder why we bother doing all that in the database rather than passing them to the event persister....?)


return event

async def do_remotely_reject_invite(
Expand Down
23 changes: 23 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@
from synapse.logging.context import nested_logging_context, run_in_background
from synapse.logging.utils import log_function
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator
from synapse.push.httppusher import HttpPusher
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.state import StateResolutionStore
from synapse.storage.databases.main.event_push_actions import BasePushAction
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
PersistedEventPosition,
Expand Down Expand Up @@ -96,6 +99,7 @@ class FederationEventHandler:
"""

def __init__(self, hs: "HomeServer"):
self._hs = hs
self._store = hs.get_datastore()
self._storage = hs.get_storage()
self._state_store = self._storage.state
Expand Down Expand Up @@ -136,6 +140,25 @@ def __init__(self, hs: "HomeServer"):

self._room_pdu_linearizer = Linearizer("fed_room_pdu")

async def notify_remote_invite(
self, event: EventBase, context: EventContext
) -> None:
actions_by_user, _ = await BulkPushRuleEvaluator(
self._hs
).get_action_for_event_by_user(event, context)
if event.state_key not in actions_by_user:
# No pushers for target user. Nothing to do.
return
# The pusher expects a differently structured push_action, so we wrangle things into the correct shape here
action_for_pusher: BasePushAction = {
"actions": actions_by_user[event.state_key],
"event_id": event.event_id,
}
pusher_configs = await self._store.get_pushers_by_user_id(event.state_key)
for pusher_config in pusher_configs:
p = HttpPusher(self._hs, pusher_config)
await p.process_push_action(action_for_pusher)

async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
"""Process a PDU received via a federation /send/ transaction

Expand Down
2 changes: 1 addition & 1 deletion synapse/push/action_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ async def handle_push_actions_for_event(
self, event: EventBase, context: EventContext
) -> None:
with Measure(self.clock, "action_for_event_by_user"):
await self.bulk_evaluator.action_for_event_by_user(event, context)
await self.bulk_evaluator.store_action_for_event_by_user(event, context)
28 changes: 18 additions & 10 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,30 @@ async def _get_power_levels_and_sender_level(

return pl_event.content if pl_event else {}, sender_level

async def action_for_event_by_user(
async def store_action_for_event_by_user(
self, event: EventBase, context: EventContext
) -> None:
"""Given an event and context, evaluate the push rules, check if the message
should increment the unread count, and insert the results into the
event_push_actions_staging table.
"""
count_as_unread = _should_count_as_unread(event, context)
actions_by_user, count_as_unread = await self.get_action_for_event_by_user(
event, context
)

# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(
event.event_id,
actions_by_user,
count_as_unread,
)

async def get_action_for_event_by_user(
self, event: EventBase, context: EventContext
) -> Tuple[Dict[str, List[Union[dict, str]]], bool]:
count_as_unread = _should_count_as_unread(event, context)
rules_by_user = await self._get_rules_for_event(event, context)
actions_by_user: Dict[str, List[Union[dict, str]]] = {}

Expand Down Expand Up @@ -256,14 +271,7 @@ async def action_for_event_by_user(
actions_by_user[uid] = actions
break

# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(
event.event_id,
actions_by_user,
count_as_unread,
)
return actions_by_user, count_as_unread


def _condition_checker(
Expand Down
6 changes: 3 additions & 3 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.storage.databases.main.event_push_actions import HttpPushAction
from synapse.storage.databases.main.event_push_actions import BasePushAction

from . import push_rule_evaluator, push_tools

Expand Down Expand Up @@ -204,7 +204,7 @@ async def _unsafe_process(self) -> None:
"app_display_name": self.app_display_name,
},
):
processed = await self._process_one(push_action)
processed = await self.process_push_action(push_action)

if processed:
http_push_processed_counter.inc()
Expand Down Expand Up @@ -274,7 +274,7 @@ async def _unsafe_process(self) -> None:
)
break

async def _process_one(self, push_action: HttpPushAction) -> bool:
async def process_push_action(self, push_action: BasePushAction) -> bool:
if "notify" not in push_action["actions"]:
return True

Expand Down