Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Have all instances correctly respond to REPLICATE command. #7475

Merged
merged 4 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7475.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Have all instance correctly respond to REPLICATE command.
4 changes: 2 additions & 2 deletions synapse/replication/http/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ def __init__(self, hs):

self._instance_name = hs.get_instance_name()

# We pull the streams from the replication steamer (if we try and make
# We pull the streams from the replication handler (if we try and make
# them ourselves we end up in an import loop).
self.streams = hs.get_replication_streamer().get_streams()
self.streams = hs.get_tcp_replication().get_streams()

@staticmethod
def _serialize_payload(stream_name, from_token, upto_token):
Expand Down
53 changes: 44 additions & 9 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@
UserSyncCommand,
)
from synapse.replication.tcp.protocol import AbstractConnection
from synapse.replication.tcp.streams import STREAMS_MAP, Stream
from synapse.replication.tcp.streams import (
STREAMS_MAP,
CachesStream,
FederationStream,
Stream,
)
from synapse.util.async_helpers import Linearizer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,6 +78,26 @@ def __init__(self, hs):
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
} # type: Dict[str, Stream]

# List of streams that this instance is the source of
self._streams_to_replicate = [] # type: List[Stream]

for stream in self._streams.values():
if stream.NAME == CachesStream.NAME:
# All workers can write to the cache invalidation stream.
self._streams_to_replicate.append(stream)
continue

# Only add any other streams if we're on master.
if hs.config.worker_app is not None:
continue

if stream.NAME == FederationStream.NAME and hs.config.send_federation:
# We only support federation stream if federation sending
# has been disabled on the master.
continue

self._streams_to_replicate.append(stream)

self._position_linearizer = Linearizer(
"replication_position", clock=self._clock
)
Expand Down Expand Up @@ -150,16 +175,26 @@ def start_replication(self, hs):
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self._factory)

async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
# We only want to announce positions by the writer of the streams.
# Currently this is just the master process.
if not self._is_master:
return
def get_streams(self) -> Dict[str, Stream]:
"""Get a map from stream name to all streams.
"""
return self._streams

def get_streams_to_replicate(self) -> List[Stream]:
"""Get a list of streams that this instances replicates.
"""
return self._streams_to_replicate

for stream_name, stream in self._streams.items():
current_token = stream.current_token(self._instance_name)
async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
# We respond with current position of all streams this instance
# replicates.
for stream in self.get_streams_to_replicate():
self.send_command(
PositionCommand(stream_name, self._instance_name, current_token)
PositionCommand(
stream.NAME,
self._instance_name,
stream.current_token(self._instance_name),
)
)

async def on_USER_SYNC(self, conn: AbstractConnection, cmd: UserSyncCommand):
Expand Down
39 changes: 3 additions & 36 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,13 @@

import logging
import random
from typing import Dict, List

from prometheus_client import Counter

from twisted.internet.protocol import Factory

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
from synapse.replication.tcp.streams import (
STREAMS_MAP,
CachesStream,
FederationStream,
Stream,
)
from synapse.util.metrics import Measure

stream_updates_counter = Counter(
Expand Down Expand Up @@ -80,42 +73,16 @@ def __init__(self, hs):

self._replication_torture_level = hs.config.replication_torture_level

# Work out list of streams that this instance is the source of.
self.streams = [] # type: List[Stream]

# All workers can write to the cache invalidation stream.
self.streams.append(CachesStream(hs))

if hs.config.worker_app is None:
for stream in STREAMS_MAP.values():
if stream == FederationStream and hs.config.send_federation:
# We only support federation stream if federation sending
# has been disabled on the master.
continue

if stream == CachesStream:
# We've already added it above.
continue

self.streams.append(stream(hs))

self.streams_by_name = {stream.NAME: stream for stream in self.streams}

# Only bother registering the notifier callback if we have streams to
# publish.
if self.streams:
self.notifier.add_replication_callback(self.on_notifier_poke)
self.notifier.add_replication_callback(self.on_notifier_poke)

# Keeps track of whether we are currently checking for updates
self.is_looping = False
self.pending_updates = False

self.command_handler = hs.get_tcp_replication()

def get_streams(self) -> Dict[str, Stream]:
"""Get a mapp from stream name to stream instance.
"""
return self.streams_by_name
# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()

def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
Expand Down