Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add ability to wait for replication streams #7542

Merged
merged 9 commits into from
May 22, 2020
21 changes: 16 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,10 +1298,18 @@ async def do_invite_join(
room_id=room_id, room_version=room_version_obj,
)

await self._persist_auth_tree(
max_stream_id = await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj
)

# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below use caches.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
"master", "events", max_stream_id
)

# Check whether this room is the result of an upgrade of a room we already know
# about. If so, migrate over user information
predecessor = await self.store.get_room_predecessor(room_id)
Expand Down Expand Up @@ -1882,7 +1890,7 @@ async def _persist_auth_tree(
state: List[EventBase],
event: EventBase,
room_version: RoomVersion,
) -> None:
) -> int:
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
Persists the event separately. Notifies about the persisted events
Expand Down Expand Up @@ -1976,7 +1984,7 @@ async def _persist_auth_tree(
event, old_state=state
)

await self.persist_events_and_notify([(event, new_event_context)])
return await self.persist_events_and_notify([(event, new_event_context)])

async def _prep_event(
self,
Expand Down Expand Up @@ -2829,7 +2837,7 @@ async def persist_events_and_notify(
self,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> None:
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.

Expand All @@ -2839,11 +2847,12 @@ async def persist_events_and_notify(
backfilling or not
"""
if self.config.worker_app:
await self._send_events_to_master(
result = await self._send_events_to_master(
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
return result["max_stream_id"]
else:
max_stream_id = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
Expand All @@ -2858,6 +2867,8 @@ async def persist_events_and_notify(
for event, _ in event_and_contexts:
await self._notify_persisted_event(event, max_stream_id)

return max_stream_id

async def _notify_persisted_event(
self, event: EventBase, max_stream_id: int
) -> None:
Expand Down
13 changes: 10 additions & 3 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"""Handles events newly received from federation, including persisting and
notifying.
notifying. Returns the maximum stream ID of the persisted events.

The API looks like:

Expand All @@ -46,6 +46,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"context": { .. serialized event context .. },
}],
"backfilled": false
}

200 OK

{
"max_stream_id": 32443,
}
"""

NAME = "fed_send_events"
Expand Down Expand Up @@ -115,11 +122,11 @@ async def _handle_request(self, request):

logger.info("Got %d events from federation", len(event_and_contexts))

await self.federation_handler.persist_events_and_notify(
max_stream_id = await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled
)

return 200, {}
return 200, {"max_stream_id": max_stream_id}


class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
Expand Down