Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce log spam when running multiple event persisters #12610

Merged
merged 3 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12610.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce log spam when running multiple event persisters.
9 changes: 7 additions & 2 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> Non
# Ignore POSITION that are just our own echoes
return

logger.info("Handling '%s %s'", cmd.NAME, cmd.to_line())
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())

self._add_command_to_stream_queue(conn, cmd)

Expand Down Expand Up @@ -567,6 +567,11 @@ async def _process_position(
# between then and now.
missing_updates = cmd.prev_token != current_token
while missing_updates:
# Note: There may very well not be any new updates, but we check to
# make sure. This can particularly happen for the event stream where
# event persisters continuously send `POSITION`. See `resource.py`
# for why this can happen.
Copy link
Contributor

@H-Shay H-Shay May 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own curiosity/understanding, is this the comment in resource.py that you are referring to?

# If we are replicating an event stream we want to periodically check if

Edit: or is it the comment you added in this PR, which I saw after asking this question?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's that one basically.


logger.info(
"Fetching replication rows for '%s' between %i and %i",
stream_name,
Expand All @@ -590,7 +595,7 @@ async def _process_position(
[stream.parse_row(row) for row in rows],
)

logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)

# We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position(
Expand Down
9 changes: 9 additions & 0 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ async def _run_notifier_loop(self) -> None:
# turns out that e.g. account data streams share
# their "current token" with each other, meaning
# that it is *not* safe to send a POSITION.

# Note: `last_token` may not *actually* be the
# last token we sent out in a RDATA or POSITION.
# This can happen if we sent out an RDATA for
# position X when our current token was say X+1.
# Other workers will see RDATA for X and then a
# POSITION with last token of X+1, which will
# cause them to check if there were any missing
# updates between X and X+1.
logger.info(
"Sending position: %s -> %s",
stream.NAME,
Expand Down