diff --git a/changelog.d/14070.misc b/changelog.d/14070.misc new file mode 100644 index 000000000000..6e3d1c5af87d --- /dev/null +++ b/changelog.d/14070.misc @@ -0,0 +1 @@ +Batch up initial power level event when creating rooms. diff --git a/synapse/event_auth.py b/synapse/event_auth.py index c7d5ef92fc00..bab31e33c5e3 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -15,7 +15,18 @@ import logging import typing -from typing import Any, Collection, Dict, Iterable, List, Optional, Set, Tuple, Union +from typing import ( + Any, + Collection, + Dict, + Iterable, + List, + Mapping, + Optional, + Set, + Tuple, + Union, +) from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes @@ -134,6 +145,7 @@ def validate_event_for_room_version(event: "EventBase") -> None: async def check_state_independent_auth_rules( store: _EventSourceStore, event: "EventBase", + batched_auth_events: Optional[Mapping[str, "EventBase"]] = None, ) -> None: """Check that an event complies with auth rules that are independent of room state @@ -143,6 +155,8 @@ async def check_state_independent_auth_rules( Args: store: the datastore; used to fetch the auth events for validation event: the event being checked. + batched_auth_events: if the event being authed is part of a batch, any events + from the same batch that may be necessary to auth the current event Raises: AuthError if the checks fail @@ -162,6 +176,9 @@ async def check_state_independent_auth_rules( redact_behaviour=EventRedactBehaviour.as_is, allow_rejected=True, ) + if batched_auth_events: + auth_events.update(batched_auth_events) + room_id = event.room_id auth_dict: MutableStateMap[str] = {} expected_auth_types = auth_types_for_event(event.room_version, event) diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py index 8249ca1ed26c..3bbad0271bcc 100644 --- a/synapse/handlers/event_auth.py +++ b/synapse/handlers/event_auth.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Collection, List, Optional, Union +from typing import TYPE_CHECKING, Collection, List, Mapping, Optional, Union from synapse import event_auth from synapse.api.constants import ( @@ -29,7 +29,6 @@ ) from synapse.events import EventBase from synapse.events.builder import EventBuilder -from synapse.events.snapshot import EventContext from synapse.types import StateMap, get_domain_from_id if TYPE_CHECKING: @@ -51,12 +50,21 @@ def __init__(self, hs: "HomeServer"): async def check_auth_rules_from_context( self, event: EventBase, - context: EventContext, + batched_auth_events: Optional[Mapping[str, EventBase]] = None, ) -> None: - """Check an event passes the auth rules at its own auth events""" - await check_state_independent_auth_rules(self._store, event) + """Check an event passes the auth rules at its own auth events + Args: + event: event to be authed + batched_auth_events: if the event being authed is part of a batch, any events + from the same batch that may be necessary to auth the current event + """ + await check_state_independent_auth_rules( + self._store, event, batched_auth_events + ) auth_event_ids = event.auth_event_ids() auth_events_by_id = await self._store.get_events(auth_event_ids) + if batched_auth_events: + auth_events_by_id.update(batched_auth_events) check_state_dependent_auth_rules(event, auth_events_by_id.values()) def compute_auth_events( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 986ffed3d592..3f2eed07536c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -928,7 +928,7 @@ async def on_make_join_request( # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_join_request` - await self._event_auth_handler.check_auth_rules_from_context(event, context) + await self._event_auth_handler.check_auth_rules_from_context(event) return event async def on_invite_request( @@ -1003,7 +1003,9 @@ async def on_invite_request( context = EventContext.for_outlier(self._storage_controllers) - await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context) + await self._bulk_push_rule_evaluator.action_for_event_by_user( + [(event, context)] + ) try: await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] @@ -1109,7 +1111,7 @@ async def on_make_leave_request( try: # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_leave_request` - await self._event_auth_handler.check_auth_rules_from_context(event, context) + await self._event_auth_handler.check_auth_rules_from_context(event) except AuthError as e: logger.warning("Failed to create new leave %r because %s", event, e) raise e @@ -1168,7 +1170,7 @@ async def on_make_knock_request( try: # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_knock_request` - await self._event_auth_handler.check_auth_rules_from_context(event, context) + await self._event_auth_handler.check_auth_rules_from_context(event) except AuthError as e: logger.warning("Failed to create new knock %r because %s", event, e) raise e @@ -1334,9 +1336,7 @@ async def exchange_third_party_invite( try: validate_event_for_room_version(event) - await self._event_auth_handler.check_auth_rules_from_context( - event, context - ) + await self._event_auth_handler.check_auth_rules_from_context(event) except AuthError as e: logger.warning("Denying new third party invite %r because %s", event, e) raise e @@ -1386,7 +1386,7 @@ async def on_exchange_third_party_invite_request( try: validate_event_for_room_version(event) - await self._event_auth_handler.check_auth_rules_from_context(event, context) + await self._event_auth_handler.check_auth_rules_from_context(event) except AuthError as e: logger.warning("Denying third party invite %r because %s", event, e) raise e diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index da319943cc19..9be7c094c4bf 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -2118,7 +2118,7 @@ async def _run_push_actions_and_persist_event( ) else: await self._bulk_push_rule_evaluator.action_for_event_by_user( - event, context + [(event, context)] ) try: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index da1acea2755b..05bbbb331a9f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1360,8 +1360,16 @@ async def handle_new_client_event( else: try: validate_event_for_room_version(event) + # If we are persisting a batch of events the event(s) needed to auth the + # current event may be part of the batch and will not be in the DB yet + event_id_to_event = {e.event_id: e for e, _ in events_and_context} + batched_auth_events = {} + for event_id in event.auth_event_ids(): + auth_event = event_id_to_event.get(event_id) + if auth_event: + batched_auth_events[event_id] = auth_event await self._event_auth_handler.check_auth_rules_from_context( - event, context + event, batched_auth_events ) except AuthError as err: logger.warning("Denying new event %r because %s", event, err) @@ -1424,18 +1432,10 @@ async def _persist_events( PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ - - for event, context in events_and_context: - # Skip push notification actions for historical messages - # because we don't want to notify people about old history back in time. - # The historical messages also do not have the proper `context.current_state_ids` - # and `state_groups` because they have `prev_events` that aren't persisted yet - # (historical messages persisted in reverse-chronological order). - if not event.internal_metadata.is_historical(): - with opentracing.start_active_span("calculate_push_actions"): - await self._bulk_push_rule_evaluator.action_for_event_by_user( - event, context - ) + with opentracing.start_active_span("calculate_push_actions"): + await self._bulk_push_rule_evaluator.action_for_event_by_user( + events_and_context + ) try: # If we're a worker we need to hit out to the master. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 57ab05ad25d9..f6030a7ad50c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -229,9 +229,7 @@ async def upgrade_room( }, ) validate_event_for_room_version(tombstone_event) - await self._event_auth_handler.check_auth_rules_from_context( - tombstone_event, tombstone_context - ) + await self._event_auth_handler.check_auth_rules_from_context(tombstone_event) # Upgrade the room # @@ -1159,6 +1157,7 @@ async def send( depth += 1 state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id + events_to_send = [] # We treat the power levels override specially as this needs to be one # of the first events that get sent into a room. pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None) @@ -1167,7 +1166,7 @@ async def send( EventTypes.PowerLevels, pl_content, False ) current_state_group = power_context._state_group - await send(power_event, power_context, creator) + events_to_send.append((power_event, power_context)) else: power_level_content: JsonDict = { "users": {creator_id: 100}, @@ -1216,9 +1215,8 @@ async def send( False, ) current_state_group = pl_context._state_group - await send(pl_event, pl_context, creator) + events_to_send.append((pl_event, pl_context)) - events_to_send = [] if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: room_alias_event, room_alias_context = await create_event( EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index eced182fd571..2ecfdf2951e6 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -171,7 +171,10 @@ async def _get_rules_for_event( return rules_by_user async def _get_power_levels_and_sender_level( - self, event: EventBase, context: EventContext + self, + event: EventBase, + context: EventContext, + event_id_to_event: Mapping[str, EventBase], ) -> Tuple[dict, Optional[int]]: # There are no power levels and sender levels possible to get from outlier if event.internal_metadata.is_outlier(): @@ -183,15 +186,26 @@ async def _get_power_levels_and_sender_level( ) pl_event_id = prev_state_ids.get(POWER_KEY) + # fastpath: if there's a power level event, that's all we need, and + # not having a power level event is an extreme edge case if pl_event_id: - # fastpath: if there's a power level event, that's all we need, and - # not having a power level event is an extreme edge case - auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)} + # check that the power level event is not in the batch before checking the DB + pl_event = event_id_to_event.get(pl_event_id) + if pl_event: + auth_events = {POWER_KEY: pl_event} + else: + auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)} else: auth_events_ids = self._event_auth_handler.compute_auth_events( event, prev_state_ids, for_verification=False ) auth_events_dict = await self.store.get_events(auth_events_ids) + # check to see that there aren't any needed auth events in the batch as it + # hasn't been persisted yet + for auth_event_id in auth_events_ids: + auth_event = event_id_to_event.get(auth_event_id) + if auth_event: + auth_events_dict[auth_event_id] = auth_event auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()} sender_level = get_user_power_level(event.sender, auth_events) @@ -247,132 +261,148 @@ async def _get_mutual_relations( @measure_func("action_for_event_by_user") async def action_for_event_by_user( - self, event: EventBase, context: EventContext + self, events_and_context: List[Tuple[EventBase, EventContext]] ) -> None: - """Given an event and context, evaluate the push rules, check if the message - should increment the unread count, and insert the results into the - event_push_actions_staging table. + """Given a list of events and their associated contexts, evaluate the push rules + for each event, check if the message should increment the unread count, and + insert the results into the event_push_actions_staging table. """ - if not event.internal_metadata.is_notifiable(): - # Push rules for events that aren't notifiable can't be processed by this - return - - # Disable counting as unread unless the experimental configuration is - # enabled, as it can cause additional (unwanted) rows to be added to the - # event_push_actions table. - count_as_unread = False - if self.hs.config.experimental.msc2654_enabled: - count_as_unread = _should_count_as_unread(event, context) - - rules_by_user = await self._get_rules_for_event(event) - actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} - - room_member_count = await self.store.get_number_joined_users_in_room( - event.room_id - ) + for event, context in events_and_context: + if not event.internal_metadata.is_notifiable(): + # Push rules for events that aren't notifiable can't be processed by this + return + # Skip push notification actions for historical messages + # because we don't want to notify people about old history back in time. + # The historical messages also do not have the proper `context.current_state_ids` + # and `state_groups` because they have `prev_events` that aren't persisted yet + # (historical messages persisted in reverse-chronological order). + if event.internal_metadata.is_historical(): + return + + # Disable counting as unread unless the experimental configuration is + # enabled, as it can cause additional (unwanted) rows to be added to the + # event_push_actions table. + count_as_unread = False + if self.hs.config.experimental.msc2654_enabled: + count_as_unread = _should_count_as_unread(event, context) + + rules_by_user = await self._get_rules_for_event(event) + actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} + + room_member_count = await self.store.get_number_joined_users_in_room( + event.room_id + ) - ( - power_levels, - sender_power_level, - ) = await self._get_power_levels_and_sender_level(event, context) - - relation = relation_from_event(event) - # If the event does not have a relation, then cannot have any mutual - # relations or thread ID. - relations = {} - thread_id = MAIN_TIMELINE - if relation: - relations = await self._get_mutual_relations( - relation.parent_id, - itertools.chain(*(r.rules() for r in rules_by_user.values())), + # For batched events the power level events may not have been persisted yet, + # so we pass in the batched events. Thus if the event cannot be found in the + # database we can check in the batch. + event_id_to_event = {e.event_id: e for e, _ in events_and_context} + ( + power_levels, + sender_power_level, + ) = await self._get_power_levels_and_sender_level( + event, context, event_id_to_event ) - # Recursively attempt to find the thread this event relates to. - if relation.rel_type == RelationTypes.THREAD: - thread_id = relation.parent_id - else: - # Since the event has not yet been persisted we check whether - # the parent is part of a thread. - thread_id = await self.store.get_thread_id(relation.parent_id) or "main" - - # It's possible that old room versions have non-integer power levels (floats or - # strings). Workaround this by explicitly converting to int. - notification_levels = power_levels.get("notifications", {}) - if not event.room_version.msc3667_int_only_power_levels: - for user_id, level in notification_levels.items(): - notification_levels[user_id] = int(level) - - evaluator = PushRuleEvaluator( - _flatten_dict(event), - room_member_count, - sender_power_level, - notification_levels, - relations, - self._relations_match_enabled, - ) - users = rules_by_user.keys() - profiles = await self.store.get_subset_users_in_room_with_profiles( - event.room_id, users - ) + relation = relation_from_event(event) + # If the event does not have a relation, then cannot have any mutual + # relations or thread ID. + relations = {} + thread_id = MAIN_TIMELINE + if relation: + relations = await self._get_mutual_relations( + relation.parent_id, + itertools.chain(*(r.rules() for r in rules_by_user.values())), + ) + # Recursively attempt to find the thread this event relates to. + if relation.rel_type == RelationTypes.THREAD: + thread_id = relation.parent_id + else: + # Since the event has not yet been persisted we check whether + # the parent is part of a thread. + thread_id = ( + await self.store.get_thread_id(relation.parent_id) or "main" + ) + + # It's possible that old room versions have non-integer power levels (floats or + # strings). Workaround this by explicitly converting to int. + notification_levels = power_levels.get("notifications", {}) + if not event.room_version.msc3667_int_only_power_levels: + for user_id, level in notification_levels.items(): + notification_levels[user_id] = int(level) + + evaluator = PushRuleEvaluator( + _flatten_dict(event), + room_member_count, + sender_power_level, + notification_levels, + relations, + self._relations_match_enabled, + ) - for uid, rules in rules_by_user.items(): - if event.sender == uid: - continue + users = rules_by_user.keys() + profiles = await self.store.get_subset_users_in_room_with_profiles( + event.room_id, users + ) - display_name = None - profile = profiles.get(uid) - if profile: - display_name = profile.display_name - - if not display_name: - # Handle the case where we are pushing a membership event to - # that user, as they might not be already joined. - if event.type == EventTypes.Member and event.state_key == uid: - display_name = event.content.get("displayname", None) - if not isinstance(display_name, str): - display_name = None - - if count_as_unread: - # Add an element for the current user if the event needs to be marked as - # unread, so that add_push_actions_to_staging iterates over it. - # If the event shouldn't be marked as unread but should notify the - # current user, it'll be added to the dict later. - actions_by_user[uid] = [] - - actions = evaluator.run(rules, uid, display_name) - if "notify" in actions: - # Push rules say we should notify the user of this event - actions_by_user[uid] = actions - - # If there aren't any actions then we can skip the rest of the - # processing. - if not actions_by_user: - return - - # This is a check for the case where user joins a room without being - # allowed to see history, and then the server receives a delayed event - # from before the user joined, which they should not be pushed for - # - # We do this *after* calculating the push actions as a) its unlikely - # that we'll filter anyone out and b) for large rooms its likely that - # most users will have push disabled and so the set of users to check is - # much smaller. - uids_with_visibility = await filter_event_for_clients_with_state( - self.store, actions_by_user.keys(), event, context - ) + for uid, rules in rules_by_user.items(): + if event.sender == uid: + continue - for user_id in set(actions_by_user).difference(uids_with_visibility): - actions_by_user.pop(user_id, None) - - # Mark in the DB staging area the push actions for users who should be - # notified for this event. (This will then get handled when we persist - # the event) - await self.store.add_push_actions_to_staging( - event.event_id, - actions_by_user, - count_as_unread, - thread_id, - ) + display_name = None + profile = profiles.get(uid) + if profile: + display_name = profile.display_name + + if not display_name: + # Handle the case where we are pushing a membership event to + # that user, as they might not be already joined. + if event.type == EventTypes.Member and event.state_key == uid: + display_name = event.content.get("displayname", None) + if not isinstance(display_name, str): + display_name = None + + if count_as_unread: + # Add an element for the current user if the event needs to be marked as + # unread, so that add_push_actions_to_staging iterates over it. + # If the event shouldn't be marked as unread but should notify the + # current user, it'll be added to the dict later. + actions_by_user[uid] = [] + + actions = evaluator.run(rules, uid, display_name) + if "notify" in actions: + # Push rules say we should notify the user of this event + actions_by_user[uid] = actions + + # If there aren't any actions then we can skip the rest of the + # processing. + if not actions_by_user: + return + + # This is a check for the case where user joins a room without being + # allowed to see history, and then the server receives a delayed event + # from before the user joined, which they should not be pushed for + # + # We do this *after* calculating the push actions as a) its unlikely + # that we'll filter anyone out and b) for large rooms its likely that + # most users will have push disabled and so the set of users to check is + # much smaller. + uids_with_visibility = await filter_event_for_clients_with_state( + self.store, actions_by_user.keys(), event, context + ) + + for user_id in set(actions_by_user).difference(uids_with_visibility): + actions_by_user.pop(user_id, None) + + # Mark in the DB staging area the push actions for users who should be + # notified for this event. (This will then get handled when we persist + # the event) + await self.store.add_push_actions_to_staging( + event.event_id, + actions_by_user, + count_as_unread, + thread_id, + ) MemberMap = Dict[str, Optional[EventIdMembership]] diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py index 675d7df2ac45..4d8bee3fda8e 100644 --- a/tests/push/test_bulk_push_rule_evaluator.py +++ b/tests/push/test_bulk_push_rule_evaluator.py @@ -71,4 +71,4 @@ def test_action_for_event_by_user_handles_noninteger_power_levels(self) -> None: bulk_evaluator = BulkPushRuleEvaluator(self.hs) # should not raise - self.get_success(bulk_evaluator.action_for_event_by_user(event, context)) + self.get_success(bulk_evaluator.action_for_event_by_user([(event, context)])) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index ce53f808db9a..121f3d8d6517 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -371,7 +371,7 @@ def make_worker_hs( config=worker_hs.config.server.listeners[0], resource=resource, server_version_string="1", - max_request_body_size=4096, + max_request_body_size=8192, reactor=self.reactor, ) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 3612ebe7b961..8e67c207e7e0 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -711,7 +711,7 @@ def test_post_room_no_keys(self) -> None: self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(34, channel.resource_usage.db_txn_count) + self.assertEqual(31, channel.resource_usage.db_txn_count) def test_post_room_initial_state(self) -> None: # POST with initial_state config key, expect new room id @@ -724,7 +724,7 @@ def test_post_room_initial_state(self) -> None: self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(37, channel.resource_usage.db_txn_count) + self.assertEqual(33, channel.resource_usage.db_txn_count) def test_post_room_visibility_key(self) -> None: # POST with visibility config key, expect new room id