Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix cross-worker ratelimiting #16558

Merged
merged 8 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16558.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ratelimiting of message sending when using workers, where the ratelimit would only be applied after most of the work has been done.
73 changes: 57 additions & 16 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,26 @@ async def create_and_send_nonmember_event(
raise ShadowBanError()

if ratelimit:
await self.request_ratelimiter.ratelimit(requester, update=False)
room_id = event_dict["room_id"]
try:
room_version = await self.store.get_room_version(room_id)
except NotFoundError:
# If the room doesnt' exist.
raise AuthError(403, f"User {requester.user} not in room {room_id}")

if room_version.updated_redaction_rules:
redacts = event_dict["content"].get("redacts")
else:
redacts = event_dict.get("redacts")

is_admin_redaction = await self.is_admin_redaction(
event_type=event_dict["type"],
sender=event_dict["sender"],
redacts=redacts,
)
await self.request_ratelimiter.ratelimit(
requester, is_admin_redaction=is_admin_redaction, update=False
)

# We limit the number of concurrent event sends in a room so that we
# don't fork the DAG too much. If we don't limit then we can end up in
Expand Down Expand Up @@ -1508,6 +1527,18 @@ async def _persist_events(
first_event.room_id
)
if writer_instance != self._instance_name:
# Ratelimit before sending to the other event persister, to
# ensure that we correctly have ratelimits on both the event
# creators and event persiters.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# creators and event persiters.
# creators and event persisters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple creators going to the same persister (Is that possible?) than we might pass this rate check, but fail the one on the persister.

I think the opposite isn't true since persisters are sharded by room so we either have a 1-to-1 mapping or a many-to-1 mapping? But the creater could fill the bucket faster if for some reason the send_events below fails (e.g. the persister is offline for a period?)

I guess restarts would also make these go out of sync?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, these can totally get out of sync. However, if they do get out of sync then the worst that is going to happen is that you're being ratelimited when you're already over the threshold, so 🤷

if ratelimit:
for event, _ in events_and_context:
is_admin_redaction = await self.is_admin_redaction(
event.type, event.sender, event.redacts
)
await self.request_ratelimiter.ratelimit(
requester, is_admin_redaction=is_admin_redaction
)

try:
result = await self.send_events(
instance_name=writer_instance,
Expand Down Expand Up @@ -1538,6 +1569,7 @@ async def _persist_events(
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id

return event

event = await self.persist_and_notify_client_events(
Expand Down Expand Up @@ -1696,21 +1728,9 @@ async def persist_and_notify_client_events(
# can apply different ratelimiting. We do this by simply checking
# it's not a self-redaction (to avoid having to look up whether the
# user is actually admin or not).
is_admin_redaction = False
if event.type == EventTypes.Redaction:
assert event.redacts is not None

original_event = await self.store.get_event(
event.redacts,
redact_behaviour=EventRedactBehaviour.as_is,
get_prev_content=False,
allow_rejected=False,
allow_none=True,
)

is_admin_redaction = bool(
original_event and event.sender != original_event.sender
)
is_admin_redaction = await self.is_admin_redaction(
event.type, event.sender, event.redacts
)

await self.request_ratelimiter.ratelimit(
requester, is_admin_redaction=is_admin_redaction
Expand Down Expand Up @@ -1930,6 +1950,27 @@ async def _notify() -> None:

return persisted_events[-1]

async def is_admin_redaction(
self, event_type: str, sender: str, redacts: Optional[str]
) -> bool:
"""Return whether the event is a redaction made by an admin, and thus
should use a different ratelimiter.
"""
if event_type != EventTypes.Redaction:
return False

assert redacts is not None

original_event = await self.store.get_event(
redacts,
redact_behaviour=EventRedactBehaviour.as_is,
get_prev_content=False,
allow_rejected=False,
allow_none=True,
)

return bool(original_event and sender != original_event.sender)

async def _maybe_kick_guest_users(
self, event: EventBase, context: EventContext
) -> None:
Expand Down