Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Discard RDATA from already seen positions #7648

Merged
merged 18 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7648.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
In working mode, ensure that replicated data has not already been received.
5 changes: 5 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,11 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows):
except Exception:
logger.exception("Error processing replication")

async def on_position(self, stream_name: str, instance_name: str, token: int):
await super().on_position(stream_name, instance_name, token)
# Also call on_rdata to ensure that stream positions are properly reset.
await self.on_rdata(stream_name, instance_name, token, [])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably needs a comment, but I'm not really sure what to suggest...

Also call on_rdata to ensure that stream positions are properly reset.

Or something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, something like that. I don't have any better suggestions at least :/


def stop_pusher(self, user_id, app_id, pushkey):
if not self.notify_pushers:
return
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def get_logcontext_id(self):


class PositionCommand(Command):
"""Sent by the server to tell the client the stream postition without
"""Sent by the server to tell the client the stream position without
needing to send an RDATA.

Format::
Expand Down Expand Up @@ -188,7 +188,7 @@ class ErrorCommand(_SimpleCommand):


class PingCommand(_SimpleCommand):
"""Sent by either side as a keep alive. The data is arbitary (often timestamp)
"""Sent by either side as a keep alive. The data is arbitrary (often timestamp)
"""

NAME = "PING"
Expand Down
30 changes: 26 additions & 4 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ def __init__(self, hs):
"replication_position", clock=self._clock
)

# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
# Map of stream name to batched updates. See RdataCommand for info on
# how batching works.
self._pending_batches = {} # type: Dict[str, List[Any]]

# The factory used to create connections.
Expand All @@ -123,7 +123,8 @@ def __init__(self, hs):
# outgoing replication commands to.)
self._connections = [] # type: List[AbstractConnection]

# For each connection, the incoming streams that are coming from that connection
# For each connection, the incoming stream names that are coming from
# that connection.
self._streams_by_connection = {} # type: Dict[AbstractConnection, Set[str]]

LaterGauge(
Expand Down Expand Up @@ -310,7 +311,28 @@ async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
# Check if this is the last of a batch of updates
rows = self._pending_batches.pop(stream_name, [])
rows.append(row)
await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)

stream = self._streams.get(stream_name)
if not stream:
logger.error("Got RDATA for unknown stream: %s", stream_name)
return

# Find where we previously streamed up to.
current_token = stream.current_token(cmd.instance_name)

# Discard this data if this token is earlier than the current
# position. Note that streams can be reset (in which case you
# expect an earlier token), but that must be preceded by a
# POSITION command.
if cmd.token <= current_token:
logger.debug(
"Discarding RDATA from stream %s at position %s before previous position %s",
stream_name,
cmd.token,
current_token,
)
else:
await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)

async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
Expand Down
74 changes: 59 additions & 15 deletions tests/replication/tcp/streams/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.replication.tcp.commands import RdataCommand
from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
from synapse.replication.tcp.streams.events import (
EventsStreamCurrentStateRow,
Expand Down Expand Up @@ -66,11 +67,6 @@ def test_update_function_event_row_limit(self):
# also one state event
state_event = self._inject_state_event()

# tell the notifier to catch up to avoid duplicate rows.
# workaround for https://github.com/matrix-org/synapse/issues/7360
# FIXME remove this when the above is fixed
self.replicate()

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)
Expand Down Expand Up @@ -174,11 +170,6 @@ def test_update_function_huge_state_change(self):
# one more bit of state that doesn't get rolled back
state2 = self._inject_state_event()

# tell the notifier to catch up to avoid duplicate rows.
# workaround for https://github.com/matrix-org/synapse/issues/7360
# FIXME remove this when the above is fixed
self.replicate()

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)
Expand Down Expand Up @@ -327,11 +318,6 @@ def test_update_function_state_row_limit(self):
prev_events = [e.event_id]
pl_events.append(e)

# tell the notifier to catch up to avoid duplicate rows.
# workaround for https://github.com/matrix-org/synapse/issues/7360
# FIXME remove this when the above is fixed
self.replicate()

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)
Expand Down Expand Up @@ -378,6 +364,64 @@ def test_update_function_state_row_limit(self):

self.assertEqual([], received_rows)

def test_backwards_stream_id(self):
"""
Test that RDATA that comes after the current position should be discarded.
"""
# disconnect, so that we can stack up some changes
self.disconnect()

# Generate an events. We inject them using inject_event so that they are
# not send out over replication until we call self.replicate().
event = self._inject_test_event()

# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual([], self.test_handler.received_rdata_rows)

# now reconnect to pull the updates
self.reconnect()
self.replicate()

# We should have received the expected single row (as well as various
# cache invalidation updates which we ignore).
received_rows = [
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
]

# There should be a single received row.
self.assertEqual(len(received_rows), 1)

stream_name, token, row = received_rows[0]
self.assertEqual("events", stream_name)
self.assertIsInstance(row, EventsStreamRow)
self.assertEqual(row.type, "ev")
self.assertIsInstance(row.data, EventsStreamEventRow)
self.assertEqual(row.data.event_id, event.event_id)

# Reset the data.
self.test_handler.received_rdata_rows = []

# Save the current token for later.
worker_events_stream = self.worker_hs.get_replication_streams()["events"]
prev_token = worker_events_stream.current_token("master")

# Manually send an old RDATA command, which should get dropped. This
# re-uses the row from above, but with an earlier stream token.
self.hs.get_tcp_replication().send_command(
RdataCommand("events", "master", 1, row)
)

# No updates have been received (because it was discard as old).
received_rows = [
row for row in self.test_handler.received_rdata_rows if row[0] == "events"
]
self.assertEqual(len(received_rows), 0)

clokep marked this conversation as resolved.
Show resolved Hide resolved
# Ensure the stream has not gone backwards.
current_token = worker_events_stream.current_token("master")
self.assertGreaterEqual(current_token, prev_token)

event_count = 0

def _inject_test_event(
Expand Down
88 changes: 82 additions & 6 deletions tests/replication/tcp/streams/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

from synapse.handlers.typing import RoomMember
from synapse.replication.tcp.streams import TypingStream
from synapse.util.caches.stream_change_cache import StreamChangeCache

from tests.replication._base import BaseStreamTestCase

USER_ID = "@feeling:blue"
USER_ID_2 = "@da-ba-dee:blue"

ROOM_ID = "!bar:blue"
ROOM_ID_2 = "!foo:blue"


class TypingStreamTestCase(BaseStreamTestCase):
Expand All @@ -29,11 +34,9 @@ def _build_replication_data_handler(self):
def test_typing(self):
typing = self.hs.get_typing_handler()

room_id = "!bar:blue"

self.reconnect()

typing._push_update(member=RoomMember(room_id, USER_ID), typing=True)
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)

self.reactor.advance(0)

Expand All @@ -46,15 +49,15 @@ def test_typing(self):
self.assertEqual(stream_name, "typing")
self.assertEqual(1, len(rdata_rows))
row = rdata_rows[0] # type: TypingStream.TypingStreamRow
self.assertEqual(room_id, row.room_id)
self.assertEqual(ROOM_ID, row.room_id)
self.assertEqual([USER_ID], row.user_ids)

# Now let's disconnect and insert some data.
self.disconnect()

self.test_handler.on_rdata.reset_mock()

typing._push_update(member=RoomMember(room_id, USER_ID), typing=False)
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=False)

self.test_handler.on_rdata.assert_not_called()

Expand All @@ -73,5 +76,78 @@ def test_typing(self):
self.assertEqual(stream_name, "typing")
self.assertEqual(1, len(rdata_rows))
row = rdata_rows[0]
self.assertEqual(room_id, row.room_id)
self.assertEqual(ROOM_ID, row.room_id)
self.assertEqual([], row.user_ids)

def test_reset(self):
"""
Test what happens when a typing stream resets.

This is emulated by jumping the stream ahead, then reconnecting (which
sends the proper position and RDATA).
"""
typing = self.hs.get_typing_handler()

self.reconnect()

typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)

self.reactor.advance(0)

# We should now see an attempt to connect to the master
request = self.handle_http_replication_attempt()
self.assert_request_is_get_repl_stream_updates(request, "typing")

self.test_handler.on_rdata.assert_called_once()
stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
self.assertEqual(stream_name, "typing")
self.assertEqual(1, len(rdata_rows))
row = rdata_rows[0] # type: TypingStream.TypingStreamRow
self.assertEqual(ROOM_ID, row.room_id)
self.assertEqual([USER_ID], row.user_ids)

# Push the stream forward a bunch so it can be reset.
for i in range(100):
typing._push_update(
member=RoomMember(ROOM_ID, "@test%s:blue" % i), typing=True
)
self.reactor.advance(0)

# Disconnect.
self.disconnect()

# Reset the typing handler
self.hs.get_replication_streams()["typing"].last_token = 0
self.hs.get_tcp_replication()._streams["typing"].last_token = 0
typing._latest_room_serial = 0
typing._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", typing._latest_room_serial
)
typing._reset()

# Reconnect.
self.reconnect()
self.pump(0.1)

# We should now see an attempt to connect to the master
request = self.handle_http_replication_attempt()
self.assert_request_is_get_repl_stream_updates(request, "typing")

clokep marked this conversation as resolved.
Show resolved Hide resolved
# Reset the test code.
self.test_handler.on_rdata.reset_mock()
self.test_handler.on_rdata.assert_not_called()

# Push additional data.
typing._push_update(member=RoomMember(ROOM_ID_2, USER_ID_2), typing=False)
self.reactor.advance(0)

self.test_handler.on_rdata.assert_called_once()
stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
self.assertEqual(stream_name, "typing")
self.assertEqual(1, len(rdata_rows))
row = rdata_rows[0]
self.assertEqual(ROOM_ID_2, row.room_id)
self.assertEqual([], row.user_ids)
clokep marked this conversation as resolved.
Show resolved Hide resolved

# The token should have been reset.
self.assertEqual(token, 1)