Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stop the master relaying USER_SYNC for other workers #7318

Merged
merged 8 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ Asks the server for the current position of all streams.

#### USER_SYNC (C)

A user has started or stopped syncing
A user has started or stopped syncing on this process.

#### CLEAR_USER_SYNC (C)

Expand All @@ -216,10 +216,6 @@ Asks the server for the current position of all streams.

Inform the server a cache should be invalidated

#### SYNC (S, C)

Used exclusively in tests

### REMOTE_SERVER_UP (S, C)

Inform other processes that a remote server may have come back online.
Expand Down
6 changes: 3 additions & 3 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,12 @@ def process_replication_rows(self, token, rows):
stream_id = token
yield self.notify_from_replication(states, stream_id)

def get_currently_syncing_users(self) -> Set[str]:
return {
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
user_id
for user_id, count in self._user_to_num_current_syncs.items()
if count > 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PresenceHandler code this is simply if count, might make sense to keep them consistent.

}
]


class GenericWorkerTyping(object):
Expand Down
40 changes: 20 additions & 20 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,15 @@ async def user_syncing(
"""

@abc.abstractmethod
def get_currently_syncing_users(self) -> Set[str]:
"""Get the set of user ids that are currently syncing on this HS.
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
"""Get the set of syncing users on this worker, to send to the presence handler
richvdh marked this conversation as resolved.
Show resolved Hide resolved

This is called when a replication connection is established. It should return
a list of user ids, which are then sent as USER_SYNC commands to inform the
process handling presence about those users.

Returns:
set(str): A set of user_id strings.
A set of user_id strings.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
"""

async def get_state(self, target_user: UserID) -> UserPresenceState:
Expand Down Expand Up @@ -443,10 +448,18 @@ async def _handle_timeouts(self):

timers_fired_counter.inc(len(states))

syncing_user_ids = {
user_id
for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)

changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_user_ids=self.get_currently_syncing_users(),
syncing_user_ids=syncing_user_ids,
now=now,
)

Expand Down Expand Up @@ -544,22 +557,9 @@ def _user_syncing():

return _user_syncing()

def get_currently_syncing_users(self):
"""Get the set of user ids that are currently syncing on this HS.
Returns:
set(str): A set of user_id strings.
"""
if self.hs.config.use_presence:
syncing_user_ids = {
user_id
for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
else:
return set()
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
# since we are the process handling presence, there is nothing to do here.
return []

async def update_external_syncs_row(
self, process_id, user_id, is_syncing, sync_time_msec
Expand Down
7 changes: 5 additions & 2 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,18 @@ def to_line(self):

class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or
stopped syncing. Used to calculate presence on the master.
stopped syncing on this process.

This is used by the process handling presence (typically the master) to
calculate who is online and who is not.

Includes a timestamp of when the last user sync was.

Format::

USER_SYNC <instance_id> <user_id> <state> <last_sync_ms>

Where <state> is either "start" or "stop"
Where <state> is either "start" or "end"
"""

NAME = "USER_SYNC"
Expand Down
15 changes: 5 additions & 10 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,6 @@ async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
if self._is_master:
self._notifier.notify_remote_server_up(cmd.data)

def get_currently_syncing_users(self):
"""Get the list of currently syncing users (if any). This is called
when a connection has been established and we need to send the
currently syncing users.
"""
return self._presence_handler.get_currently_syncing_users()

def new_connection(self, connection: AbstractConnection):
"""Called when we have a new connection.
"""
Expand All @@ -325,9 +318,11 @@ def new_connection(self, connection: AbstractConnection):
if self._factory:
self._factory.resetDelay()

# Tell the server if we have any users currently syncing (should only
# happen on synchrotrons)
currently_syncing = self.get_currently_syncing_users()
# Tell the other end if we have any users currently syncing.
currently_syncing = (
self._presence_handler.get_currently_syncing_users_for_replication()
)

now = self._clock.time_msec()
for user_id in currently_syncing:
connection.send_command(
Expand Down