Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Correctly advance un_partial_stated_room_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jan 19, 2023
1 parent d24b947 commit c58774f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
8 changes: 5 additions & 3 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1727,14 +1727,16 @@ async def _sync_partial_state_room(

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)

# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()

if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()

# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
Expand Down
9 changes: 9 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.config.homeserver import HomeServerConfig
from synapse.events import EventBase
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -140,6 +141,13 @@ def __init__(
db_conn, "un_partial_stated_room_stream", "stream_id"
)

async def process_replication_position(
self, stream_name: str, instance_name: str, token: int
) -> None:
if stream_name == UnPartialStatedRoomStream.NAME:
self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token)
return super().process_replication_position(stream_name, instance_name, token)

async def store_room(
self,
room_id: str,
Expand Down Expand Up @@ -2372,3 +2380,4 @@ def _clear_partial_state_room_txn(
WHERE stream_id <= ?
"""
txn.execute(sql, (device_lists_stream_id,))
txn.execute(sql, (device_lists_stream_id,))

0 comments on commit c58774f

Please sign in to comment.