Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
On catchup, process each row with its own stream id (#7286)
Browse files Browse the repository at this point in the history
Other parts of the code (such as the StreamChangeCache) assume that there will
not be multiple changes with the same stream id.

This code was introduced in #7024, and I hope this fixes #7206.
  • Loading branch information
richvdh authored Apr 20, 2020
1 parent 054c231 commit 0f8f02b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog.d/7286.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move catchup of replication streams logic to worker.
73 changes: 68 additions & 5 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,18 @@
# limitations under the License.

import logging
from typing import Any, Callable, Dict, List, Optional, Set
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
TypeVar,
)

from prometheus_client import Counter

Expand Down Expand Up @@ -268,11 +279,14 @@ async def on_POSITION(self, cmd: PositionCommand):
missing_updates,
) = await stream.get_updates_since(current_token, cmd.token)

if updates:
# TODO: add some tests for this

# Some streams return multiple rows with the same stream IDs,
# which need to be processed in batches.

for token, rows in _batch_updates(updates):
await self.on_rdata(
cmd.stream_name,
current_token,
[stream.parse_row(update[1]) for update in updates],
cmd.stream_name, token, [stream.parse_row(row) for row in rows],
)

# We've now caught up to position sent to us, notify handler.
Expand Down Expand Up @@ -404,3 +418,52 @@ def stream_update(self, stream_name: str, token: str, data: Any):
We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, token, data))


UpdateToken = TypeVar("UpdateToken")
UpdateRow = TypeVar("UpdateRow")


def _batch_updates(
updates: Iterable[Tuple[UpdateToken, UpdateRow]]
) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]:
"""Collect stream updates with the same token together
Given a series of updates returned by Stream.get_updates_since(), collects
the updates which share the same stream_id together.
For example:
[(1, a), (1, b), (2, c), (3, d), (3, e)]
becomes:
[
(1, [a, b]),
(2, [c]),
(3, [d, e]),
]
"""

update_iter = iter(updates)

first_update = next(update_iter, None)
if first_update is None:
# empty input
return

current_batch_token = first_update[0]
current_batch = [first_update[1]]

for token, row in update_iter:
if token != current_batch_token:
# different token to the previous row: flush the previous
# batch and start anew
yield current_batch_token, current_batch
current_batch_token = token
current_batch = []

current_batch.append(row)

# flush the final batch
yield current_batch_token, current_batch
3 changes: 3 additions & 0 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def entity_has_changed(self, entity, stream_pos):
"""
assert type(stream_pos) is int

# FIXME: add a sanity check here that we are not overwriting existing
# data in self._cache

if stream_pos > self._earliest_known_stream_pos:
old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
Expand Down

0 comments on commit 0f8f02b

Please sign in to comment.