Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stop the master relaying USER_SYNC for other workers #7318

Merged
merged 8 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7318.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move catchup of replication streams logic to worker.
6 changes: 1 addition & 5 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ Asks the server for the current position of all streams.

#### USER_SYNC (C)

A user has started or stopped syncing
A user has started or stopped syncing on this process.

#### CLEAR_USER_SYNC (C)

Expand All @@ -216,10 +216,6 @@ Asks the server for the current position of all streams.

Inform the server a cache should be invalidated

#### SYNC (S, C)

Used exclusively in tests

### REMOTE_SERVER_UP (S, C)

Inform other processes that a remote server may have come back online.
Expand Down
2 changes: 2 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class EventTypes(object):

Retention = "m.room.retention"

Presence = "m.presence"


class RejectedReason(object):
AUTH_ERROR = "auth_error"
Expand Down
85 changes: 49 additions & 36 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import contextlib
import logging
import sys
from typing import Dict, Iterable

from typing_extensions import ContextManager

from twisted.internet import defer, reactor
from twisted.web.resource import NoResource
Expand All @@ -38,14 +41,14 @@
from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.handlers.presence import BasePresenceHandler, get_interested_parties
from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
Expand Down Expand Up @@ -221,23 +224,32 @@ async def on_POST(self, request, device_id):
return 200, {"one_time_key_counts": result}


class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""

def __exit__(self, exc_type, exc_val, exc_tb):
pass


UPDATE_SYNCING_USERS_MS = 10 * 1000


class GenericWorkerPresence(object):
class GenericWorkerPresence(BasePresenceHandler):
def __init__(self, hs):
super().__init__(hs)
self.hs = hs
self.is_mine_id = hs.is_mine_id
self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore()
self.user_to_num_current_syncs = {}
self.clock = hs.get_clock()

self._presence_enabled = hs.config.use_presence

# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs = {} # type: Dict[str, int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little confusing to me that GenericWorkerPresence has _user_to_num_current_syncs and PresenceHandler has user_to_num_current_syncs -- might be reasonable to abstract this? They also both use the code from the get_currently_syncing_users_for_replication method, but one inlines it? (I see that they need different implementations of that method...)


self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()

active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {state.user_id: state for state in active_presence}

# user_id -> last_sync_ms. Lists the users that have stopped syncing
# but we haven't notified the master of that yet
self.users_going_offline = {}
Expand All @@ -255,13 +267,13 @@ def __init__(self, hs):
)

def _on_shutdown(self):
if self.hs.config.use_presence:
if self._presence_enabled:
self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id)
)

def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
if self._presence_enabled:
self.hs.get_tcp_replication().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)
Expand Down Expand Up @@ -303,28 +315,33 @@ def set_state(self, user, state, ignore_status_msg=False):
# TODO Hows this supposed to work?
return defer.succeed(None)

get_states = __func__(PresenceHandler.get_states)
get_state = __func__(PresenceHandler.get_state)
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
async def user_syncing(
self, user_id: str, affect_presence: bool
) -> ContextManager[None]:
"""Record that a user is syncing.

Called by the sync and events servlets to record that a user has connected to
this worker and is waiting for some events.
"""
if not affect_presence or not self._presence_enabled:
return _NullContextManager()

def user_syncing(self, user_id, affect_presence):
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
self._user_to_num_current_syncs[user_id] = curr_sync + 1
Comment on lines +329 to +330
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern usually pops out to me as a good opportunity to use defaultdict(int). It looks like it only occurs twice -- here and similar code in PresenceHandler though so not sure it would be worth it.


# If we went from no in flight sync to some, notify replication
if self.user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
# If we went from no in flight sync to some, notify replication
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)

def _end():
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
if affect_presence and user_id in self.user_to_num_current_syncs:
self.user_to_num_current_syncs[user_id] -= 1
if user_id in self._user_to_num_current_syncs:
self._user_to_num_current_syncs[user_id] -= 1

# If we went from one in flight sync to non, notify replication
if self.user_to_num_current_syncs[user_id] == 0:
if self._user_to_num_current_syncs[user_id] == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we don't remove the entry from _user_to_num_current_syncs at this point? Is it expected that they'll come back (and just cause churn if we remove it)? Removing them would simplify get_currently_syncing_users_for_replication a bit.

self.mark_as_going_offline(user_id)

@contextlib.contextmanager
Expand All @@ -334,7 +351,7 @@ def _user_syncing():
finally:
_end()

return defer.succeed(_user_syncing())
return _user_syncing()

@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
Expand Down Expand Up @@ -369,15 +386,12 @@ def process_replication_rows(self, token, rows):
stream_id = token
yield self.notify_from_replication(states, stream_id)

def get_currently_syncing_users(self):
if self.hs.config.use_presence:
return [
user_id
for user_id, count in self.user_to_num_current_syncs.items()
if count > 0
]
else:
return set()
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
user_id
for user_id, count in self._user_to_num_current_syncs.items()
if count > 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PresenceHandler code this is simply if count, might make sense to keep them consistent.

]


class GenericWorkerTyping(object):
Expand Down Expand Up @@ -619,8 +633,7 @@ def __init__(self, hs):

self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
# NB this is a SynchrotronPresence, not a normal PresenceHandler
self.presence_handler = hs.get_presence_handler()
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
self.notifier = hs.get_notifier()

self.notify_pushers = hs.config.start_pushers
Expand Down
20 changes: 12 additions & 8 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.utils import log_function
from synapse.types import UserID
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -97,6 +98,8 @@ async def get_stream(
explicit_room_id=room_id,
)

time_now = self.clock.time_msec()

# When the user joins a new room, or another user joins a currently
# joined room, we need to send down presence for those users.
to_add = []
Expand All @@ -112,19 +115,20 @@ async def get_stream(
users = await self.state.get_current_users_in_room(
event.room_id
)
states = await presence_handler.get_states(users, as_event=True)
to_add.extend(states)
else:
users = [event.state_key]

ev = await presence_handler.get_state(
UserID.from_string(event.state_key), as_event=True
)
to_add.append(ev)
states = await presence_handler.get_states(users)
to_add.extend(
{
"type": EventTypes.Presence,
"content": format_user_presence_state(state, time_now),
}
for state in states
)

events.extend(to_add)

time_now = self.clock.time_msec()

chunks = await self._event_serializer.serialize_events(
events,
time_now,
Expand Down
10 changes: 8 additions & 2 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,16 @@ async def get_presence():
return []

states = await presence_handler.get_states(
[m.user_id for m in room_members], as_event=True
[m.user_id for m in room_members]
)

return states
return [
{
"type": EventTypes.Presence,
"content": format_user_presence_state(s, time_now),
}
for s in states
]

async def get_receipts():
receipts = await self.store.get_linearized_receipts_for_room(
Expand Down
Loading