Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Discard RDATA from already seen positions #7648

Merged
merged 18 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7648.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
In working mode, ensure that replicated data has not already been received.
4 changes: 2 additions & 2 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def get_logcontext_id(self):


class PositionCommand(Command):
"""Sent by the server to tell the client the stream postition without
"""Sent by the server to tell the client the stream position without
needing to send an RDATA.

Format::
Expand Down Expand Up @@ -188,7 +188,7 @@ class ErrorCommand(_SimpleCommand):


class PingCommand(_SimpleCommand):
"""Sent by either side as a keep alive. The data is arbitary (often timestamp)
"""Sent by either side as a keep alive. The data is arbitrary (often timestamp)
"""

NAME = "PING"
Expand Down
28 changes: 24 additions & 4 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ def __init__(self, hs):
"replication_position", clock=self._clock
)

# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
# Map of stream name to batched updates. See RdataCommand for info on
# how batching works.
self._pending_batches = {} # type: Dict[str, List[Any]]

# The factory used to create connections.
Expand All @@ -123,7 +123,8 @@ def __init__(self, hs):
# outgoing replication commands to.)
self._connections = [] # type: List[AbstractConnection]

# For each connection, the incoming streams that are coming from that connection
# For each connection, the incoming stream names that are coming from
# that connection.
self._streams_by_connection = {} # type: Dict[AbstractConnection, Set[str]]

LaterGauge(
Expand Down Expand Up @@ -310,7 +311,26 @@ async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
# Check if this is the last of a batch of updates
rows = self._pending_batches.pop(stream_name, [])
rows.append(row)
await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)

stream = self._streams.get(stream_name)
if not stream:
logger.error("Got RDATA for unknown stream: %s", stream_name)
return

# Find where we previously streamed up to.
current_token = stream.current_token(cmd.instance_name)

# Discard this data if this token is earlier than the current
# position.
clokep marked this conversation as resolved.
Show resolved Hide resolved
if cmd.token <= current_token:
logger.debug(
"Discarding RDATA from stream %s at POSITION %s before previous POSITION %s",
clokep marked this conversation as resolved.
Show resolved Hide resolved
stream_name,
cmd.token,
current_token,
)
else:
await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)

async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
Expand Down
66 changes: 51 additions & 15 deletions tests/replication/tcp/streams/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.replication.tcp.commands import RdataCommand
from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
from synapse.replication.tcp.streams.events import (
EventsStreamCurrentStateRow,
Expand Down Expand Up @@ -66,11 +67,6 @@ def test_update_function_event_row_limit(self):
# also one state event
state_event = self._inject_state_event()

# tell the notifier to catch up to avoid duplicate rows.
# workaround for https://github.com/matrix-org/synapse/issues/7360
# FIXME remove this when the above is fixed
self.replicate()

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)
Expand Down Expand Up @@ -174,11 +170,6 @@ def test_update_function_huge_state_change(self):
# one more bit of state that doesn't get rolled back
state2 = self._inject_state_event()

# tell the notifier to catch up to avoid duplicate rows.
# workaround for https://github.com/matrix-org/synapse/issues/7360
# FIXME remove this when the above is fixed
self.replicate()

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)
Expand Down Expand Up @@ -327,11 +318,6 @@ def test_update_function_state_row_limit(self):
prev_events = [e.event_id]
pl_events.append(e)

# tell the notifier to catch up to avoid duplicate rows.
# workaround for https://github.com/matrix-org/synapse/issues/7360
# FIXME remove this when the above is fixed
self.replicate()

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)
Expand Down Expand Up @@ -378,6 +364,56 @@ def test_update_function_state_row_limit(self):

self.assertEqual([], received_rows)

def test_backwards_stream_id(self):
"""
Test that RDATA that comes after the current position should be discarded.
"""
# disconnect, so that we can stack up some changes
self.disconnect()

# Generate an events. We inject them using inject_event so that they are
# not send out over replication until we call self.replicate().
event = self._inject_test_event()

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)

# now reconnect to pull the updates
self.reconnect()
self.replicate()

# We should have received the expected single row (as well as various
# cache invalidation updates which we ignore).
received_rows = [
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
]

# There should be a single received row.
self.assertEqual(len(received_rows), 1)

stream_name, token, row = received_rows[0]
self.assertEqual("events", stream_name)
self.assertIsInstance(row, EventsStreamRow)
self.assertEqual(row.type, "ev")
self.assertIsInstance(row.data, EventsStreamEventRow)
self.assertEqual(row.data.event_id, event.event_id)

# Reset the data.
self.test_handler.received_rdata_rows = []

# Manually send an old RDATA command, which should get dropped. This
# re-uses the row from above, but with an earlier stream token.
self.hs.get_tcp_replication().send_command(
RdataCommand("events", "master", 1, row)
)

# No updates have been received (because it was discard as old).
received_rows = [
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
]
self.assertEqual(len(received_rows), 0)

clokep marked this conversation as resolved.
Show resolved Hide resolved
event_count = 0

def _inject_test_event(
Expand Down
63 changes: 63 additions & 0 deletions tests/replication/tcp/streams/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from mock import Mock

from synapse.handlers.typing import RoomMember
from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.streams import TypingStream

from tests.replication._base import BaseStreamTestCase
Expand Down Expand Up @@ -75,3 +76,65 @@ def test_typing(self):
row = rdata_rows[0]
self.assertEqual(room_id, row.room_id)
self.assertEqual([], row.user_ids)

def test_reset(self):
"""
Test what happens when a typing stream resets.

This is emulated by jumping the stream ahead, then reconnecting (which
sends the proper position and RDATA).
"""
typing = self.hs.get_typing_handler()

room_id = "!bar:blue"

self.reconnect()

typing._push_update(member=RoomMember(room_id, USER_ID), typing=True)

self.reactor.advance(0)

# We should now see an attempt to connect to the master
request = self.handle_http_replication_attempt()
self.assert_request_is_get_repl_stream_updates(request, "typing")

self.test_handler.on_rdata.assert_called_once()
stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
self.assertEqual(stream_name, "typing")
self.assertEqual(1, len(rdata_rows))
row = rdata_rows[0] # type: TypingStream.TypingStreamRow
self.assertEqual(room_id, row.room_id)
self.assertEqual([USER_ID], row.user_ids)

# Jump the stream ahead manually, the state of the master is not
# modified, however.
self.hs.get_tcp_replication().send_command(
PositionCommand("typing", "master", 100)
)
clokep marked this conversation as resolved.
Show resolved Hide resolved

# Now let's disconnect and insert some data.
self.disconnect()

self.test_handler.on_rdata.reset_mock()

typing._push_update(member=RoomMember(room_id, USER_ID), typing=False)

self.test_handler.on_rdata.assert_not_called()

self.reconnect()
self.pump(0.1)

# We should now see an attempt to connect to the master
request = self.handle_http_replication_attempt()
self.assert_request_is_get_repl_stream_updates(request, "typing")

clokep marked this conversation as resolved.
Show resolved Hide resolved
# The from token should be the token from the last RDATA we got.
self.assertEqual(int(request.args[b"from_token"][0]), token)

self.test_handler.on_rdata.assert_called_once()
stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
self.assertEqual(stream_name, "typing")
self.assertEqual(1, len(rdata_rows))
row = rdata_rows[0]
self.assertEqual(room_id, row.room_id)
self.assertEqual([], row.user_ids)
clokep marked this conversation as resolved.
Show resolved Hide resolved