Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Improve code documentation for the typing stream over replication. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
reivilibre authored Mar 11, 2022
1 parent 735e89b commit 4a53f35
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 6 deletions.
1 change: 1 addition & 0 deletions changelog.d/12211.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve code documentation for the typing stream over replication.
5 changes: 3 additions & 2 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ def process_replication_rows(
"""Should be called whenever we receive updates for typing stream."""

if self._latest_room_serial > token:
# The master has gone backwards. To prevent inconsistent data, just
# clear everything.
# The typing worker has gone backwards (e.g. it may have restarted).
# To prevent inconsistent data, just clear everything.
logger.info("Typing handler stream went backwards; resetting")
self._reset()

# Set the latest serial token to whatever the server gave us.
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def send_remote_server_up(self, server: str) -> None:
self.send_command(RemoteServerUpCommand(server))

def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
"""Called when a new update is available to stream to clients.
"""Called when a new update is available to stream to Redis subscribers.
We need to check if the client is interested in the stream or not
"""
Expand Down
6 changes: 3 additions & 3 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def buildProtocol(self, addr: IAddress) -> ServerReplicationStreamProtocol:
class ReplicationStreamer:
"""Handles replication connections.
This needs to be poked when new replication data may be available. When new
data is available it will propagate to all connected clients.
This needs to be poked when new replication data may be available.
When new data is available it will propagate to all Redis subscribers.
"""

def __init__(self, hs: "HomeServer"):
Expand Down Expand Up @@ -109,7 +109,7 @@ def __init__(self, hs: "HomeServer"):

def on_notifier_poke(self) -> None:
"""Checks if there is actually any new data and sends it to the
connections if there are.
Redis subscribers if there are.
This should get called each time new data is available, even if it
is currently being executed, so that nothing gets missed
Expand Down
12 changes: 12 additions & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,19 @@ def __init__(self, hs: "HomeServer"):
class TypingStream(Stream):
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingStreamRow:
"""
An entry in the typing stream.
Describes all the users that are 'typing' right now in one room.
When a user stops typing, it will be streamed as a new update with that
user absent; you can think of the `user_ids` list as overwriting the
entire list that was there previously.
"""

# The room that this update is for.
room_id: str

# All the users that are 'typing' right now in the specified room.
user_ids: List[str]

NAME = "typing"
Expand Down

0 comments on commit 4a53f35

Please sign in to comment.