This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Replace HTTP replication with TCP replication #2069
Closed
Closed
Changes from 1 commit
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
35b4aa0
Notify on new federation traffic
erikjohnston ae72a0b
Add new storage functions for new replication
erikjohnston 86da2ec
Add a simple hook to wait for replication traffic
erikjohnston 6d97fe4
Make federation send queue take the current position
erikjohnston 0762bf3
Define the various streams we will replicate
erikjohnston d5fb256
Initial TCP protocol implementation
erikjohnston 390511f
Add server side resource for tcp replication
erikjohnston 78d127a
Add functions to presence to support remote syncs
erikjohnston faffead
Add basic replication client handler and factory
erikjohnston 510c547
Change slave storage to use new replication interface
erikjohnston ae16e25
Update all the workers and master to use TCP replication
erikjohnston File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,3 +12,55 @@ | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""This module implements the TCP replication protocol used by synapse to | ||
communicate between the master process and its workers (when they're enabled). | ||
|
||
The protocol is based on fire and forget, line based commands. An example flow | ||
would be (where '>' indicates master->worker and '<' worker->master flows):: | ||
|
||
> SERVER example.com | ||
< REPLICATE events 53 | ||
> RDATA events 54 ["$foo1:bar.com", ...] | ||
> RDATA events 55 ["$foo4:bar.com", ...] | ||
|
||
The example shows the server accepting a new connection and sending its identity | ||
with the `SERVER` command, followed by the client asking to subscribe to the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rst uses double-backticks for fixed-width text... |
||
`events` stream from the token `53`. The server then periodically sends `RDATA` | ||
commands which have the format `RDATA <stream_name> <token> <row>`, where the | ||
format of `<row>` is defined by the individual streams. | ||
|
||
Error reporting happens by either the client or server sending an `ERROR` | ||
command, and usually the connection will be closed. | ||
|
||
|
||
Structure of the module: | ||
* client.py - the client classes used for workers to connect to master | ||
* command.py - the definitions of all the valid commands | ||
* protocol.py - contains bot the client and server protocol implementations, | ||
these should not be used directly | ||
* resource.py - the server classes that accepts and handle client connections | ||
* streams.py - the definitons of all the valid streams | ||
|
||
Further detail about the wire protocol can be found in protocol.py and the | ||
meaning of the various commands in command.py. | ||
|
||
|
||
Since the protocol is a simple line based, its possible to manually connect to | ||
the server using a tool like netcat. A few things should be noted when manually | ||
using the protocol: | ||
* When subscribing to a stream using `REPLICATE`, the special token `NOW` can | ||
be used to get all future updates. The special stream name `ALL` can be used | ||
with `NOW` to subscribe to all available streams. | ||
* The federation stream is only available if federation sending has been | ||
disabled on the main process. | ||
* The server will only time connections out that have sent a `PING` command. | ||
If a ping is sent then the connection will be closed if no further commands | ||
are receieved within 15s. Both the client and server protocol implementations | ||
will send an initial PING on connection and ensure at least one command every | ||
5s is sent (not necessarily `PING`). | ||
* `RDATA` commands *usually* include a numeric token, however if the stream | ||
has multiple rows to replicate per token the server will send multiple | ||
`RDATA` commands, with all but the last having a token of `batch`. See | ||
the documentation on `commands.RdataCommand` for further details. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,341 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright 2017 Vector Creations Ltd | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Defines the various valid commands | ||
|
||
The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are | ||
allowed to be sent by which side. | ||
""" | ||
|
||
import logging | ||
import ujson as json | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class Command(object): | ||
"""The base command class. | ||
|
||
All subclasses must set the NAME variable which equates to the name of the | ||
command on the wire. | ||
|
||
A full command line on the wire is constructed from `NAME + " " + to_line()` | ||
|
||
The default implementation creates a command of form `<NAME> <data>` | ||
""" | ||
NAME = None | ||
|
||
def __init__(self, data): | ||
self.data = data | ||
|
||
@classmethod | ||
def from_line(cls, line): | ||
"""Deserialises a line from the wire into this command. `line` does not | ||
include the command. | ||
""" | ||
return cls(line) | ||
|
||
def to_line(self): | ||
"""Serialises the comamnd for the wire. Does not include the command | ||
prefix. | ||
""" | ||
return self.data | ||
|
||
|
||
class ServerCommand(Command): | ||
"""Sent by the server on new connection and includes the server_name. | ||
|
||
Format:: | ||
|
||
SERVER <server_name> | ||
""" | ||
NAME = "SERVER" | ||
|
||
|
||
class RdataCommand(Command): | ||
"""Sent by server when a subscribed stream has an update. | ||
|
||
Format:: | ||
|
||
RDATA <stream_name> <token> <row_json> | ||
|
||
The `<token>` may either be a numeric stream id OR "batch". The latter case | ||
is used to support sending multiple updates with the same stream ID. This | ||
is done by sending an RDATA for each row, with all but the last RDATA having | ||
a token of "batch" and the last having the final stream ID. | ||
|
||
The client should batch all incoming RDATA with a token of "batch" (per | ||
stream_name) until it sees an RDATA with a numeric stream ID. | ||
|
||
`<token>` of "batch" maps to the instance variable `token` being None. | ||
|
||
An example of a batched series of RDATA:: | ||
|
||
RDATA presence batch ["@foo:example.com", "online", ...] | ||
RDATA presence batch ["@bar:example.com", "online", ...] | ||
RDATA presence 59 ["@baz:example.com", "online", ...] | ||
""" | ||
NAME = "RDATA" | ||
|
||
def __init__(self, stream_name, token, row): | ||
self.stream_name = stream_name | ||
self.token = token | ||
self.row = row | ||
|
||
@classmethod | ||
def from_line(cls, line): | ||
stream_name, token, row_json = line.split(" ", 2) | ||
return cls( | ||
stream_name, | ||
None if token == "batch" else int(token), | ||
json.loads(row_json) | ||
) | ||
|
||
def to_line(self): | ||
return " ".join(( | ||
self.stream_name, | ||
str(self.token) if self.token is not None else "batch", | ||
json.dumps(self.row), | ||
)) | ||
|
||
|
||
class PositionCommand(Command): | ||
"""Sent by the client to tell the client the stream postition without | ||
needing to send an RDATA. | ||
""" | ||
NAME = "POSITION" | ||
|
||
def __init__(self, stream_name, token): | ||
self.stream_name = stream_name | ||
self.token = token | ||
|
||
@classmethod | ||
def from_line(cls, line): | ||
stream_name, token = line.split(" ", 1) | ||
return cls(stream_name, int(token)) | ||
|
||
def to_line(self): | ||
return " ".join((self.stream_name, str(self.token),)) | ||
|
||
|
||
class ErrorCommand(Command): | ||
"""Sent by either side if there was an ERROR. The data is a string describing | ||
the error. | ||
""" | ||
NAME = "ERROR" | ||
|
||
|
||
class PingCommand(Command): | ||
"""Sent by either side as a keep alive. The data is arbitary (often timestamp) | ||
""" | ||
NAME = "PING" | ||
|
||
|
||
class NameCommand(Command): | ||
"""Sent by client to inform the server of the client's identity. The data | ||
is the name | ||
""" | ||
NAME = "NAME" | ||
|
||
|
||
class ReplicateCommand(Command): | ||
"""Sent by the client to subsribe to the stream. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. subsribe |
||
|
||
Format:: | ||
|
||
REPLICATE <stream_name> <token> | ||
|
||
Where <token> may be either: | ||
* a numeric stream_id to stream updates from | ||
* "NOW" to stream all subsequent updates. | ||
|
||
The <stream_name> can be "ALL" to subscribe to all known streams, in which | ||
case the <token> must be set to "NOW", i.e.:: | ||
|
||
REPLICATE ALL NOW | ||
""" | ||
NAME = "REPLICATE" | ||
|
||
def __init__(self, stream_name, token): | ||
self.stream_name = stream_name | ||
self.token = token | ||
|
||
@classmethod | ||
def from_line(cls, line): | ||
stream_name, token = line.split(" ", 1) | ||
if token in ("NOW", "now"): | ||
token = "NOW" | ||
else: | ||
token = int(token) | ||
return cls(stream_name, token) | ||
|
||
def to_line(self): | ||
return " ".join((self.stream_name, str(self.token),)) | ||
|
||
|
||
class UserSyncCommand(Command): | ||
"""Sent by the client to inform the server that a user has started or | ||
stopped syncing. Used to calculate presence on the master. | ||
|
||
Format:: | ||
|
||
USER_SYNC <user_id> <state> | ||
|
||
Where <state> is either "start" or "stop" | ||
""" | ||
NAME = "USER_SYNC" | ||
|
||
def __init__(self, user_id, is_syncing): | ||
self.user_id = user_id | ||
self.is_syncing = is_syncing | ||
|
||
@classmethod | ||
def from_line(cls, line): | ||
user_id, state = line.split(" ", 1) | ||
|
||
if state not in ("start", "end"): | ||
raise Exception("Invalid USER_SYNC state %r" % (state,)) | ||
|
||
return cls(user_id, state == "start") | ||
|
||
def to_line(self): | ||
return " ".join((self.user_id, "start" if self.is_syncing else "end")) | ||
|
||
|
||
class FederationAckCommand(Command): | ||
"""Sent by the client when its processed upto a given point in the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its -> it has |
||
federation stream. This allows the master to drop in memory caches of the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in-memory |
||
federation stream. | ||
|
||
This must only be sent from one worker (i.e. the one sending federation) | ||
|
||
Format:: | ||
|
||
FEDERATION_ACK <token> | ||
""" | ||
NAME = "FEDERATION_ACK" | ||
|
||
def __init__(self, token): | ||
self.token = token | ||
|
||
@classmethod | ||
def from_line(cls, line): | ||
return cls(int(line)) | ||
|
||
def to_line(self): | ||
return str(self.token) | ||
|
||
|
||
class SyncCommand(Command): | ||
"""Used for testing. The client protocol implementation allows waiting | ||
on a SYNC command with a specified data. | ||
""" | ||
NAME = "SYNC" | ||
|
||
|
||
class RemovePusherCommand(Command): | ||
"""Sent by the client to request the master remove the given pusher. | ||
|
||
Format:: | ||
|
||
REMOVE_PUSHER <app_id> <push_key> <user_id> | ||
""" | ||
NAME = "REMOVE_PUSHER" | ||
|
||
def __init__(self, app_id, push_key, user_id): | ||
self.user_id = user_id | ||
self.app_id = app_id | ||
self.push_key = push_key | ||
|
||
@classmethod | ||
def from_line(cls, line): | ||
app_id, push_key, user_id = line.split(" ", 2) | ||
|
||
return cls(app_id, push_key, user_id) | ||
|
||
def to_line(self): | ||
return " ".join((self.app_id, self.push_key, self.user_id)) | ||
|
||
|
||
class InvalidateCacheCommand(Command): | ||
"""Sent by the client to invalidate an upstream cache. | ||
|
||
THIS IS NOT RELIABLE, AND SHOULD *NOT* BE USED ACCEPT FOR THINGS THAT ARE | ||
NOT DISASTROUS IF WE DROP ON THE FLOOR. | ||
|
||
Mainly used to invalidate destination retry timing caches. | ||
|
||
Format:: | ||
|
||
INVALIDATE_CACHE <cache_func> <keys_json> | ||
|
||
Where <keys_json> is a json list. | ||
""" | ||
NAME = "INVALIDATE_CACHE" | ||
|
||
def __init__(self, cache_func, keys): | ||
self.cache_func = cache_func | ||
self.keys = keys | ||
|
||
@classmethod | ||
def from_line(cls, line): | ||
cache_func, keys_json = line.split(" ", 1) | ||
|
||
return cls(cache_func, json.loads(keys_json)) | ||
|
||
def to_line(self): | ||
return " ".join((self.cache_func, json.dumps(self.keys))) | ||
|
||
|
||
# Map of command name to command type. | ||
COMMAND_MAP = { | ||
cmd.NAME: cmd | ||
for cmd in ( | ||
ServerCommand, | ||
RdataCommand, | ||
PositionCommand, | ||
ErrorCommand, | ||
PingCommand, | ||
NameCommand, | ||
ReplicateCommand, | ||
UserSyncCommand, | ||
FederationAckCommand, | ||
SyncCommand, | ||
RemovePusherCommand, | ||
InvalidateCacheCommand, | ||
) | ||
} | ||
|
||
# The commands the server is allowed to send | ||
VALID_SERVER_COMMANDS = ( | ||
ServerCommand.NAME, | ||
RdataCommand.NAME, | ||
PositionCommand.NAME, | ||
ErrorCommand.NAME, | ||
PingCommand.NAME, | ||
SyncCommand.NAME, | ||
) | ||
|
||
# The commands the client is allowed to send | ||
VALID_CLIENT_COMMANDS = ( | ||
NameCommand.NAME, | ||
ReplicateCommand.NAME, | ||
PingCommand.NAME, | ||
UserSyncCommand.NAME, | ||
FederationAckCommand.NAME, | ||
RemovePusherCommand.NAME, | ||
InvalidateCacheCommand.NAME, | ||
ErrorCommand.NAME, | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I highly recommend moving the protocol docs out to separate rst files in
docs
. It'll be easier to find, and easier to read, there.