Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add ability to run replication protocol over redis. #7040

Merged
merged 15 commits into from
Apr 22, 2020
1 change: 1 addition & 0 deletions changelog.d/7040.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for running replication over Redis when using workers.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ ignore_missing_imports = True

[mypy-jwt.*]
ignore_missing_imports = True

[mypy-txredisapi]
ignore_missing_imports = True
richvdh marked this conversation as resolved.
Show resolved Hide resolved
7 changes: 7 additions & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ def _configure_named_resource(self, name, compress=False):
def start_listening(self, listeners):
config = self.get_config()

if config.redis_enabled:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
self.get_tcp_replication().start_replication(self)

for listener in listeners:
if listener["type"] == "http":
self._listening_services.extend(self._listener_http(config, listener))
Expand All @@ -292,6 +298,7 @@ def start_listening(self, listeners):
)
for s in services:
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
Expand Down
2 changes: 2 additions & 0 deletions synapse/config/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .password_auth_providers import PasswordAuthProviderConfig
from .push import PushConfig
from .ratelimiting import RatelimitConfig
from .redis import RedisConfig
from .registration import RegistrationConfig
from .repository import ContentRepositoryConfig
from .room_directory import RoomDirectoryConfig
Expand Down Expand Up @@ -82,4 +83,5 @@ class HomeServerConfig(RootConfig):
RoomDirectoryConfig,
ThirdPartyRulesConfig,
TracerConfig,
RedisConfig,
]
40 changes: 40 additions & 0 deletions synapse/config/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.config._base import Config
from synapse.python_dependencies import check_requirements

try:
import txredisapi
except ImportError:
txredisapi = None
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved


class RedisConfig(Config):
section = "redis"

def read_config(self, config, **kwargs):
redis_config = config.get("redis", {})
self.redis_enabled = redis_config.get("enabled", False)

if not self.redis_enabled:
return

check_requirements("redis")

self.redis_host = redis_config.get("host", "localhost")
self.redis_port = redis_config.get("port", 6379)
self.redis_dbid = redis_config.get("dbid")
self.redis_password = redis_config.get("password")
1 change: 1 addition & 0 deletions synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"sentry": ["sentry-sdk>=0.7.2"],
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
"jwt": ["pyjwt>=1.6.4"],
"redis": ["txredisapi>=1.4.7"],
}

ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
logger = logging.getLogger(__name__)


class ReplicationClientFactory(ReconnectingClientFactory):
class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
"""Factory for building connections to the master. Will reconnect if the
connection is lost.

Expand Down
25 changes: 18 additions & 7 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from prometheus_client import Counter

from synapse.metrics import LaterGauge
from synapse.replication.tcp.client import ReplicationClientFactory
from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
Expand Down Expand Up @@ -82,7 +82,7 @@ def __init__(self, hs):
self._pending_batches = {} # type: Dict[str, List[Any]]

# The factory used to create connections.
self._factory = None # type: Optional[ReplicationClientFactory]
self._factory = None # type: Optional[DirectTcpReplicationClientFactory]
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# The currently connected connections.
self._connections = [] # type: List[AbstractConnection]
Expand All @@ -109,11 +109,22 @@ def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
using TCP.
"""
client_name = hs.config.worker_name
self._factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self._factory)
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
)

logger.info("Connecting to redis.")
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self._factory = RedisDirectTcpReplicationClientFactory(hs)
hs.get_reactor().connectTCP(
hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
)
else:
client_name = hs.config.worker_name
self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self._factory)

async def on_REPLICATE(self, cmd: ReplicateCommand):
# We only want to announce positions by the writer of the streams.
Expand Down
178 changes: 178 additions & 0 deletions synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import TYPE_CHECKING

import txredisapi

from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import COMMAND_MAP, Command, ReplicateCommand
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
from synapse.replication.tcp.handler import ReplicationCommandHandler

logger = logging.getLogger(__name__)


class RedisSubscriber(txredisapi.SubscriberProtocol):
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""Connection to redis subscribed to replication stream.

Parses incoming messages from redis into replication commands, and passes
them to `ReplicationCommandHandler`

Due to the vagaries of `txredisapi` we don't want to have a custom
constructor, so instead we expect the defined attributes below to be set
Comment on lines +43 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely following why we can't have a custom constructor (even one that has *args, **kwargs that it passes through to super().__init__()), but i guess it's not super-important.

immediately after initialisation.
"""

handler = None # type: ReplicationCommandHandler
stream_name = None # type: str
redis_connection = None # type: txredisapi.lazyConnection
conn_id = None # type: str

def connectionMade(self):
logger.info("Connected to redis instance")
self.subscribe(self.stream_name)
self.send_command(ReplicateCommand())

self.handler.new_connection(self)

def messageReceived(self, pattern: str, channel: str, message: str):
"""Received a message from redis.
"""

if message.strip() == "":
# Ignore blank lines
return

line = message
cmd_name, rest_of_line = line.split(" ", 1)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

cmd_cls = COMMAND_MAP[cmd_name]
try:
cmd = cmd_cls.from_line(rest_of_line)
except Exception as e:
logger.exception(
"[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
)
self.send_error(
"failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
)
return

# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
)

async def handle_command(self, cmd: Command):
"""Handle a command we have received over the replication stream.

By default delegates to on_<COMMAND>, which should return an awaitable.

Args:
cmd: received command
"""
handled = False

# First call any command handlers on this instance. These are for redis
# specific handling.
cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True

# Then call out to the handler.
cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True

if not handled:
logger.warning("Unhandled command: %r", cmd)

def connectionLost(self, reason):
logger.info("Lost connection to redis instance")
self.handler.lost_connection(self)

def send_command(self, cmd: Command):
"""Send a command if connection has been established.

Args:
cmd (Command)
"""
string = "%s %s" % (cmd.NAME, cmd.to_line())
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)

encoded_string = string.encode("utf-8")

async def _send():
with PreserveLoggingContext():
# Note that we use the other connection as we can't send
# commands using the subscription connection.
await self.redis_connection.publish(self.stream_name, encoded_string)

run_as_background_process("send-cmd", _send)


class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
"""This is a reconnecting factory that connects to redis and immediately
subscribes to a stream.
"""

maxDelay = 5
continueTrying = True
protocol = RedisSubscriber

def __init__(self, hs):
super().__init__()

# This sets the password on the RedisFactory base class (as
# SubscriberFactory constructor doesn't pass it through).
self.password = hs.config.redis.redis_password

self.handler = hs.get_tcp_replication()
self.stream_name = hs.hostname

# We need two connections to redis, one for the subscription stream and
# one to send commands to (as you can't send further redis commands to a
# connection after SUBSCIBE is called).
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self.redis_connection = txredisapi.lazyConnection(
host=hs.config.redis_host,
port=hs.config.redis_port,
dbid=hs.config.redis_dbid,
password=hs.config.redis.redis_password,
reconnect=True,
)

self.conn_id = random_string(5)

def buildProtocol(self, addr):
p = super().buildProtocol(addr) # type: RedisSubscriber

# We do this here rather than add to the constructor of `RedisSubcriber`
# as to do so would involve overriding `buildProtocol` entirely, however
# the base method does some other things than just instantiating the
# protocol.
p.handler = self.handler
p.redis_connection = self.redis_connection
p.conn_id = self.conn_id
p.stream_name = self.stream_name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these be declared in RedisSubscriber's constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The library is a bit of a mess, really. If we add these to the protocol constructor then we'll need to fully override buildProtocol, which does things other than just instantiate the protocol. So its not really clear how you're meant to use this without doing this or copy and pasting bits of the code around (though the library is designed to be copied into your repo rather than using pip, so maybe that is how you're meant to do it)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment to that effect for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though the library is designed to be copied into your repo rather than using pip

is it? god, that doesn't inspire much confidence. Maybe we should do that then?

TBH my comment here was mostly that I wanted some recognition of the existence of these attributes in RedisSubscriber - even setting them to None is fine, so I think you've addressed this comment.

one thought for a workaround would be to make self.protocol a callable which can add a few extra kwargs which RedisSubscriber.__init__ could intercept. I'm not sure that buys a great deal though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though the library is designed to be copied into your repo rather than using pip

is it? god, that doesn't inspire much confidence. Maybe we should do that then?

Maybe, my bias is against doing so though.


return p
4 changes: 2 additions & 2 deletions tests/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from mock import Mock, NonCallableMock

from synapse.replication.tcp.client import (
ReplicationClientFactory,
DirectTcpReplicationClientFactory,
ReplicationDataHandler,
)
from synapse.replication.tcp.handler import ReplicationCommandHandler
Expand Down Expand Up @@ -61,7 +61,7 @@ def prepare(self, reactor, clock, hs):
self.slaved_store
)

client_factory = ReplicationClientFactory(
client_factory = DirectTcpReplicationClientFactory(
self.hs, "client_name", self.replication_handler
)
client_factory.handler = self.replication_handler
Expand Down