Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

On catchup, process each row with its own stream id #7286

Merged
merged 5 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7286.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move catchup of replication streams logic to worker.
73 changes: 68 additions & 5 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,18 @@
# limitations under the License.

import logging
from typing import Any, Callable, Dict, List, Optional, Set
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
TypeVar,
)

from prometheus_client import Counter

Expand Down Expand Up @@ -268,11 +279,14 @@ async def on_POSITION(self, cmd: PositionCommand):
missing_updates,
) = await stream.get_updates_since(current_token, cmd.token)

if updates:
# TODO: add some tests for this

# Some streams return multiple rows with the same stream IDs,
# which need to be processed in batches.

for token, rows in _batch_updates(updates):
await self.on_rdata(
cmd.stream_name,
current_token,
[stream.parse_row(update[1]) for update in updates],
cmd.stream_name, token, [stream.parse_row(row) for row in rows],
)

# We've now caught up to position sent to us, notify handler.
Expand Down Expand Up @@ -404,3 +418,52 @@ def stream_update(self, stream_name: str, token: str, data: Any):
We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, token, data))


UpdateToken = TypeVar("UpdateToken")
UpdateRow = TypeVar("UpdateRow")


def _batch_updates(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a new impl for this? Can't we share the existing one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it batches them in a slightly different way. I agree that it would be nice not to have two different impls, but I couldn't easily think of a way to share the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, of course it needs to be different.

updates: Iterable[Tuple[UpdateToken, UpdateRow]]
) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]:
"""Collect stream updates with the same token together

Given a series of updates returned by Stream.get_updates_since(), collects
the updates which share the same stream_id together.

For example:

[(1, a), (1, b), (2, c), (3, d), (3, e)]

becomes:

[
(1, [a, b]),
(2, [c]),
(3, [d, e]),
]
"""

update_iter = iter(updates)

first_update = next(update_iter, None)
if first_update is None:
# empty input
return

current_batch_token = first_update[0]
current_batch = [first_update[1]]

for token, row in update_iter:
if token != current_batch_token:
# different token to the previous row: flush the previous
# batch and start anew
yield current_batch_token, current_batch
current_batch_token = token
current_batch = []

current_batch.append(row)

# flush the final batch
yield current_batch_token, current_batch
3 changes: 3 additions & 0 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def entity_has_changed(self, entity, stream_pos):
"""
assert type(stream_pos) is int

# FIXME: add a sanity check here that we are not overwriting existing
# data in self._cache

if stream_pos > self._earliest_known_stream_pos:
old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
Expand Down