Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Wake up transaction queue when remote server comes back online (#6706)
Browse files Browse the repository at this point in the history
* commit 'a8a50f5b5':
  Wake up transaction queue when remote server comes back online (#6706)
  • Loading branch information
anoadragon453 committed Mar 23, 2020
2 parents 09cdecd + a8a50f5 commit bd6344c
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelog.d/6706.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Attempt to retry sending a transaction when we detect a remote server has come back online, rather than waiting for a transaction to be triggered by new data.
6 changes: 5 additions & 1 deletion docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.

The `<stream_name>` is the name of a replication stream to subscribe
The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.
Expand All @@ -234,6 +234,10 @@ in which case the `<token>` must be set to `NOW`.

Used exclusively in tests

### REMOTE_SERVER_UP (S, C)

Inform other processes that a remote server may have come back online.

See `synapse/replication/tcp/commands.py` for a detailed description and
the format of each command.

Expand Down
12 changes: 11 additions & 1 deletion synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ def get_streams_to_replicate(self):
args.update(self.send_handler.stream_positions())
return args

def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""

# Let's wake up the transaction queue for the server in case we have
# pending stuff to send to it.
self.send_handler.wake_destination(server)


def start(config_options):
try:
Expand Down Expand Up @@ -205,7 +212,7 @@ class FederationSenderHandler(object):
to the federation sender.
"""

def __init__(self, hs, replication_client):
def __init__(self, hs: FederationSenderServer, replication_client):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
Expand All @@ -226,6 +233,9 @@ def on_start(self):
self.store.get_room_max_stream_ordering()
)

def wake_destination(self, server: str):
self.federation_sender.wake_destination(server)

def stream_positions(self):
return {"federation": self.federation_position}

Expand Down
18 changes: 16 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from twisted.internet import defer

import synapse
import synapse.metrics
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
Expand Down Expand Up @@ -54,7 +55,7 @@


class FederationSender(object):
def __init__(self, hs):
def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self.server_name = hs.hostname

Expand Down Expand Up @@ -482,7 +483,20 @@ def send_edu(self, edu, key):

def send_device_messages(self, destination):
if destination == self.server_name:
logger.info("Not sending device update to ourselves")
logger.warning("Not sending device update to ourselves")
return

self._get_per_destination_queue(destination).attempt_new_transaction()

def wake_destination(self, destination: str):
"""Called when we want to retry sending transactions to a remote.
This is mainly useful if the remote server has been down and we think it
might have come back.
"""

if destination == self.server_name:
logger.warning("Not waking up ourselves")
return

self._get_per_destination_queue(destination).attempt_new_transaction()
Expand Down
19 changes: 18 additions & 1 deletion synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
tags,
whitelisted_homeserver,
)
from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -101,12 +102,17 @@ class NoAuthenticationError(AuthenticationError):


class Authenticator(object):
def __init__(self, hs):
def __init__(self, hs: HomeServer):
self._clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
self.notifer = hs.get_notifier()

self.replication_client = None
if hs.config.worker.worker_app:
self.replication_client = hs.get_tcp_replication()

# A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(self, request, content):
Expand Down Expand Up @@ -166,6 +172,17 @@ async def _reset_retry_timings(self, origin):
try:
logger.info("Marking origin %r as up", origin)
await self.store.set_destination_retry_timings(origin, None, 0, 0)

# Inform the relevant places that the remote server is back up.
self.notifer.notify_remote_server_up(origin)
if self.replication_client:
# If we're on a worker we try and inform master about this. The
# replication client doesn't hook into the notifier to avoid
# infinite loops where we send a `REMOTE_SERVER_UP` command to
# master, which then echoes it back to us which in turn pokes
# the notifier.
self.replication_client.send_remote_server_up(origin)

except Exception:
logger.exception("Error resetting retry timings on %s", origin)

Expand Down
31 changes: 28 additions & 3 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import logging
from collections import namedtuple
from typing import Callable, List

from prometheus_client import Counter

from twisted.internet import defer

import synapse.server
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
Expand Down Expand Up @@ -154,7 +156,7 @@ class Notifier(object):

UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000

def __init__(self, hs):
def __init__(self, hs: "synapse.server.HomeServer"):
self.user_to_user_stream = {}
self.room_to_user_streams = {}

Expand All @@ -164,7 +166,12 @@ def __init__(self, hs):
self.store = hs.get_datastore()
self.pending_new_room_events = []

self.replication_callbacks = []
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]

# Called when remote servers have come back online after having been
# down.
self.remote_server_up_callbacks = [] # type: List[Callable[[str], None]]

self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
Expand Down Expand Up @@ -205,14 +212,20 @@ def count_listeners():
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
)

def add_replication_callback(self, cb):
def add_replication_callback(self, cb: Callable[[], None]):
"""Add a callback that will be called when some new data is available.
Callback is not given any arguments. It should *not* return a Deferred - if
it needs to do any asynchronous work, a background thread should be started and
wrapped with run_as_background_process.
"""
self.replication_callbacks.append(cb)

def add_remote_server_up_callback(self, cb: Callable[[str], None]):
"""Add a callback that will be called when synapse detects a server
has been
"""
self.remote_server_up_callbacks.append(cb)

def on_new_room_event(
self, event, room_stream_id, max_room_stream_id, extra_users=[]
):
Expand Down Expand Up @@ -522,3 +535,15 @@ def notify_replication(self):
"""Notify the any replication listeners that there's a new event"""
for cb in self.replication_callbacks:
cb()

def notify_remote_server_up(self, server: str):
"""Notify any replication that a remote server has come back up
"""
# We call federation_sender directly rather than registering as a
# callback as a) we already have a reference to it and b) it introduces
# circular dependencies.
if self.federation_sender:
self.federation_sender.wake_destination(server)

for cb in self.remote_server_up_callbacks:
cb(server)
3 changes: 3 additions & 0 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ def on_sync(self, data):
if d:
d.callback(data)

def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""

def get_streams_to_replicate(self) -> Dict[str, int]:
"""Called when a new connection has been established and we need to
subscribe to streams.
Expand Down
17 changes: 17 additions & 0 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,20 @@ def to_line(self):
)


class RemoteServerUpCommand(Command):
"""Sent when a worker has detected that a remote server is no longer
"down" and retry timings should be reset.
If sent from a client the server will relay to all other workers.
Format::
REMOTE_SERVER_UP <server>
"""

NAME = "REMOTE_SERVER_UP"


_COMMANDS = (
ServerCommand,
RdataCommand,
Expand All @@ -401,6 +415,7 @@ def to_line(self):
RemovePusherCommand,
InvalidateCacheCommand,
UserIpCommand,
RemoteServerUpCommand,
) # type: Tuple[Type[Command], ...]

# Map of command name to command type.
Expand All @@ -414,6 +429,7 @@ def to_line(self):
ErrorCommand.NAME,
PingCommand.NAME,
SyncCommand.NAME,
RemoteServerUpCommand.NAME,
)

# The commands the client is allowed to send
Expand All @@ -427,4 +443,5 @@ def to_line(self):
InvalidateCacheCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
)
15 changes: 15 additions & 0 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
PingCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
ReplicateCommand,
ServerCommand,
SyncCommand,
Expand Down Expand Up @@ -460,6 +461,9 @@ async def on_REMOVE_PUSHER(self, cmd):
async def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.streamer.on_remote_server_up(cmd.data)

async def on_USER_IP(self, cmd):
self.streamer.on_user_ip(
cmd.user_id,
Expand Down Expand Up @@ -555,6 +559,9 @@ def stream_update(self, stream_name, token, data):
def send_sync(self, data):
self.send_command(SyncCommand(data))

def send_remote_server_up(self, server: str):
self.send_command(RemoteServerUpCommand(server))

def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
self.streamer.lost_connection(self)
Expand Down Expand Up @@ -588,6 +595,11 @@ def on_sync(self, data):
"""Called when get a new SYNC command."""
raise NotImplementedError()

@abc.abstractmethod
async def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
raise NotImplementedError()

@abc.abstractmethod
def get_streams_to_replicate(self):
"""Called when a new connection has been established and we need to
Expand Down Expand Up @@ -707,6 +719,9 @@ async def on_POSITION(self, cmd):
async def on_SYNC(self, cmd):
self.handler.on_sync(cmd.data)

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.handler.on_remote_server_up(cmd.data)

def replicate(self, stream_name, token):
"""Send the subscription request to the server
"""
Expand Down
9 changes: 9 additions & 0 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(self, hs):
self.federation_sender = hs.get_federation_sender()

self.notifier.add_replication_callback(self.on_notifier_poke)
self.notifier.add_remote_server_up_callback(self.send_remote_server_up)

# Keeps track of whether we are currently checking for updates
self.is_looping = False
Expand Down Expand Up @@ -288,6 +289,14 @@ async def on_user_ip(
)
await self._server_notices_sender.on_user_ip(user_id)

@measure_func("repl.on_remote_server_up")
def on_remote_server_up(self, server: str):
self.notifier.notify_remote_server_up(server)

def send_remote_server_up(self, server: str):
for conn in self.connections:
conn.send_remote_server_up(server)

def send_sync_to_all_connections(self, data):
"""Sends a SYNC command to all clients.
Expand Down
12 changes: 12 additions & 0 deletions synapse/server.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import twisted.internet

import synapse.api.auth
import synapse.config.homeserver
import synapse.federation.sender
Expand All @@ -9,10 +11,12 @@ import synapse.handlers.deactivate_account
import synapse.handlers.device
import synapse.handlers.e2e_keys
import synapse.handlers.message
import synapse.handlers.presence
import synapse.handlers.room
import synapse.handlers.room_member
import synapse.handlers.set_password
import synapse.http.client
import synapse.notifier
import synapse.rest.media.v1.media_repository
import synapse.server_notices.server_notices_manager
import synapse.server_notices.server_notices_sender
Expand Down Expand Up @@ -85,3 +89,11 @@ class HomeServer(object):
self,
) -> synapse.server_notices.server_notices_sender.ServerNoticesSender:
pass
def get_notifier(self) -> synapse.notifier.Notifier:
pass
def get_presence_handler(self) -> synapse.handlers.presence.PresenceHandler:
pass
def get_clock(self) -> synapse.util.Clock:
pass
def get_reactor(self) -> twisted.internet.base.ReactorBase:
pass

0 comments on commit bd6344c

Please sign in to comment.