Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit '7000a215e' into anoa/dinsic_release_1_18_x
Browse files Browse the repository at this point in the history
* commit '7000a215e':
  1.18.0rc2
  Typing worker needs to handle stream update requests (#7967)
  Handle replication commands synchronously where possible (#7876)
  update changelog
  • Loading branch information
anoadragon453 committed Aug 4, 2020
2 parents 7f2e76b + 7000a21 commit b7c5713
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 104 deletions.
36 changes: 20 additions & 16 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
Synapse 1.18.0rc2 (2020-07-28)
==============================

Bugfixes
--------

- Fix an `AssertionError` exception introduced in v1.18.0rc1. ([\#7876](https://github.com/matrix-org/synapse/issues/7876))
- Fix experimental support for moving typing off master when worker is restarted, which is broken in v1.18.0rc1. ([\#7967](https://github.com/matrix-org/synapse/issues/7967))


Internal Changes
----------------

- Further optimise queueing of inbound replication commands. ([\#7876](https://github.com/matrix-org/synapse/issues/7876))


Synapse 1.18.0rc1 (2020-07-27)
==============================

Expand All @@ -13,7 +29,7 @@ Features
- Allow email subjects to be customised through Synapse's configuration. ([\#7846](https://github.com/matrix-org/synapse/issues/7846))
- Add the ability to re-activate an account from the admin API. ([\#7847](https://github.com/matrix-org/synapse/issues/7847), [\#7908](https://github.com/matrix-org/synapse/issues/7908))
- Add experimental support for running multiple pusher workers. ([\#7855](https://github.com/matrix-org/synapse/issues/7855))
- Add experimental support for moving typing off master. ([\#7869](https://github.com/matrix-org/synapse/issues/7869))
- Add experimental support for moving typing off master. ([\#7869](https://github.com/matrix-org/synapse/issues/7869), [\#7959](https://github.com/matrix-org/synapse/issues/7959))
- Report CPU metrics to prometheus for time spent processing replication commands. ([\#7879](https://github.com/matrix-org/synapse/issues/7879))
- Support oEmbed for media previews. ([\#7920](https://github.com/matrix-org/synapse/issues/7920))
- Abort federation requests where the client disconnects before the ratelimiter expires. ([\#7930](https://github.com/matrix-org/synapse/issues/7930))
Expand All @@ -35,7 +51,6 @@ Bugfixes
- Fix a long standing bug where the tracing of async functions with opentracing was broken. ([\#7872](https://github.com/matrix-org/synapse/issues/7872), [\#7961](https://github.com/matrix-org/synapse/issues/7961))
- Fix "TypeError in `synapse.notifier`" exceptions. ([\#7880](https://github.com/matrix-org/synapse/issues/7880))
- Fix deprecation warning due to invalid escape sequences. ([\#7895](https://github.com/matrix-org/synapse/issues/7895))
- Add experimental support for moving typing off master. ([\#7959](https://github.com/matrix-org/synapse/issues/7959))


Updates to the Docker image
Expand All @@ -57,48 +72,37 @@ Deprecations and Removals
-------------------------

- Remove unused `synapse_replication_tcp_resource_invalidate_cache` prometheus metric. ([\#7878](https://github.com/matrix-org/synapse/issues/7878))
- Remove Ubuntu Eoan from the list of `.deb` packages that we build as it is now end-of-life. Contributed by @gary-kim. ([\#7888](https://github.com/matrix-org/synapse/issues/7888))


Internal Changes
----------------

- Switch from simplejson to the standard library json. ([\#7802](https://github.com/matrix-org/synapse/issues/7802))
- Switch parts of the codebase from `simplejson` to the standard library `json`. ([\#7802](https://github.com/matrix-org/synapse/issues/7802))
- Add type hints to the http server code and remove an unused parameter. ([\#7813](https://github.com/matrix-org/synapse/issues/7813))
- Add type hints to synapse.api.errors module. ([\#7820](https://github.com/matrix-org/synapse/issues/7820))
- Ensure that calls to `json.dumps` are compatible with the standard library json. ([\#7836](https://github.com/matrix-org/synapse/issues/7836))
- Remove redundant `retry_on_integrity_error` wrapper for event persistence code. ([\#7848](https://github.com/matrix-org/synapse/issues/7848))
- Consistently use `db_to_json` to convert from database values to JSON objects. ([\#7849](https://github.com/matrix-org/synapse/issues/7849))
- Convert E2E keys and room keys handlers to async/await. ([\#7851](https://github.com/matrix-org/synapse/issues/7851))
- Convert various parts of the codebase to async/await. ([\#7851](https://github.com/matrix-org/synapse/issues/7851), [\#7860](https://github.com/matrix-org/synapse/issues/7860), [\#7868](https://github.com/matrix-org/synapse/issues/7868), [\#7871](https://github.com/matrix-org/synapse/issues/7871), [\#7873](https://github.com/matrix-org/synapse/issues/7873), [\#7874](https://github.com/matrix-org/synapse/issues/7874), [\#7884](https://github.com/matrix-org/synapse/issues/7884), [\#7912](https://github.com/matrix-org/synapse/issues/7912), [\#7935](https://github.com/matrix-org/synapse/issues/7935), [\#7939](https://github.com/matrix-org/synapse/issues/7939), [\#7942](https://github.com/matrix-org/synapse/issues/7942), [\#7944](https://github.com/matrix-org/synapse/issues/7944))
- Add support for handling registration requests across multiple client reader workers. ([\#7853](https://github.com/matrix-org/synapse/issues/7853))
- Small performance improvement in typing processing. ([\#7856](https://github.com/matrix-org/synapse/issues/7856))
- The default value of `filter_timeline_limit` was changed from -1 (no limit) to 100. ([\#7858](https://github.com/matrix-org/synapse/issues/7858))
- Convert _base, profile, and _receipts handlers to async/await. ([\#7860](https://github.com/matrix-org/synapse/issues/7860))
- Optimise queueing of inbound replication commands. ([\#7861](https://github.com/matrix-org/synapse/issues/7861))
- Convert synapse.app and federation client to async/await. ([\#7868](https://github.com/matrix-org/synapse/issues/7868))
- Add some type annotations to `HomeServer` and `BaseHandler`. ([\#7870](https://github.com/matrix-org/synapse/issues/7870))
- Convert device handler to async/await. ([\#7871](https://github.com/matrix-org/synapse/issues/7871))
- Convert more media code to async/await. ([\#7873](https://github.com/matrix-org/synapse/issues/7873))
- Convert the federation agent and related code to async/await. ([\#7874](https://github.com/matrix-org/synapse/issues/7874))
- Clean up `PreserveLoggingContext`. ([\#7877](https://github.com/matrix-org/synapse/issues/7877))
- Change "unknown room version" logging from 'error' to 'warning'. ([\#7881](https://github.com/matrix-org/synapse/issues/7881))
- Stop using `device_max_stream_id` table and just use `device_inbox.stream_id`. ([\#7882](https://github.com/matrix-org/synapse/issues/7882))
- Convert the message handler to async/await. ([\#7884](https://github.com/matrix-org/synapse/issues/7884))
- Return an empty body for OPTIONS requests. ([\#7886](https://github.com/matrix-org/synapse/issues/7886))
- Remove Ubuntu Eoan from the list of `.deb` packages that we build as it is now end-of-life. Contributed by @gary-kim. ([\#7888](https://github.com/matrix-org/synapse/issues/7888))
- Fix typo in generated config file. Contributed by @ThiefMaster. ([\#7890](https://github.com/matrix-org/synapse/issues/7890))
- Import ABC from `collections.abc` for Python 3.10 compatibility. ([\#7892](https://github.com/matrix-org/synapse/issues/7892))
- Remove unused functions `time_function`, `trace_function`, `get_previous_frames`
and `get_previous_frame` from `synapse.logging.utils` module. ([\#7897](https://github.com/matrix-org/synapse/issues/7897))
- Convert `RoomListHandler` to async/await. ([\#7912](https://github.com/matrix-org/synapse/issues/7912))
- Lint the `contrib/` directory in CI and linting scripts, add `synctl` to the linting script for consistency with CI. ([\#7914](https://github.com/matrix-org/synapse/issues/7914))
- Use Element CSS and logo in notification emails when app name is Element. ([\#7919](https://github.com/matrix-org/synapse/issues/7919))
- Optimisation to /sync handling: skip serializing the response if the client has already disconnected. ([\#7927](https://github.com/matrix-org/synapse/issues/7927))
- When a client disconnects, don't log it as 'Error processing request'. ([\#7928](https://github.com/matrix-org/synapse/issues/7928))
- Add debugging to `/sync` response generation (disabled by default). ([\#7929](https://github.com/matrix-org/synapse/issues/7929))
- Convert the auth providers to be async/await. ([\#7935](https://github.com/matrix-org/synapse/issues/7935))
- Convert presence handler helpers to async/await. ([\#7939](https://github.com/matrix-org/synapse/issues/7939))
- Convert state resolution to async/await. ([\#7942](https://github.com/matrix-org/synapse/issues/7942))
- Convert the interactive_auth_handler wrapper to async/await. ([\#7944](https://github.com/matrix-org/synapse/issues/7944))
- Update comments that refer to Deferreds for async functions. ([\#7945](https://github.com/matrix-org/synapse/issues/7945))
- Simplify error handling in federation handler. ([\#7950](https://github.com/matrix-org/synapse/issues/7950))

Expand Down
2 changes: 1 addition & 1 deletion synapse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
except ImportError:
pass

__version__ = "1.18.0rc1"
__version__ = "1.18.0rc2"

if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ def register_servlets(self, hs):
federation.register_servlets(hs, self)
presence.register_servlets(hs, self)
membership.register_servlets(hs, self)
streams.register_servlets(hs, self)

# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
streams.register_servlets(hs, self)
115 changes: 66 additions & 49 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
from typing import (
Any,
Awaitable,
Dict,
Iterable,
Iterator,
Expand All @@ -33,6 +34,7 @@
from twisted.internet.protocol import ReconnectingClientFactory

from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Expand Down Expand Up @@ -152,7 +154,7 @@ def __init__(self, hs):
# When POSITION or RDATA commands arrive, we stick them in a queue and process
# them in order in a separate background process.

# the streams which are currently being processed by _unsafe_process_stream
# the streams which are currently being processed by _unsafe_process_queue
self._processing_streams = set() # type: Set[str]

# for each stream, a queue of commands that are awaiting processing, and the
Expand Down Expand Up @@ -185,7 +187,7 @@ def __init__(self, hs):
if self._is_master:
self._server_notices_sender = hs.get_server_notices_sender()

async def _add_command_to_stream_queue(
def _add_command_to_stream_queue(
self, conn: AbstractConnection, cmd: Union[RdataCommand, PositionCommand]
) -> None:
"""Queue the given received command for processing
Expand All @@ -199,33 +201,34 @@ async def _add_command_to_stream_queue(
logger.error("Got %s for unknown stream: %s", cmd.NAME, stream_name)
return

# if we're already processing this stream, stick the new command in the
# queue, and we're done.
queue.append((cmd, conn))

# if we're already processing this stream, there's nothing more to do:
# the new entry on the queue will get picked up in due course
if stream_name in self._processing_streams:
queue.append((cmd, conn))
return

# otherwise, process the new command.
# fire off a background process to start processing the queue.
run_as_background_process(
"process-replication-data", self._unsafe_process_queue, stream_name
)

# arguably we should start off a new background process here, but nothing
# will be too upset if we don't return for ages, so let's save the overhead
# and use the existing logcontext.
async def _unsafe_process_queue(self, stream_name: str):
"""Processes the command queue for the given stream, until it is empty
Does not check if there is already a thread processing the queue, hence "unsafe"
"""
assert stream_name not in self._processing_streams

self._processing_streams.add(stream_name)
try:
# might as well skip the queue for this one, since it must be empty
assert not queue
await self._process_command(cmd, conn, stream_name)

# now process any other commands that have built up while we were
# dealing with that one.
queue = self._command_queues_by_stream.get(stream_name)
while queue:
cmd, conn = queue.popleft()
try:
await self._process_command(cmd, conn, stream_name)
except Exception:
logger.exception("Failed to handle command %s", cmd)

finally:
self._processing_streams.discard(stream_name)

Expand Down Expand Up @@ -299,7 +302,7 @@ def get_streams_to_replicate(self) -> List[Stream]:
"""
return self._streams_to_replicate

async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
self.send_positions_to_connection(conn)

def send_positions_to_connection(self, conn: AbstractConnection):
Expand All @@ -318,57 +321,73 @@ def send_positions_to_connection(self, conn: AbstractConnection):
)
)

async def on_USER_SYNC(self, conn: AbstractConnection, cmd: UserSyncCommand):
def on_USER_SYNC(
self, conn: AbstractConnection, cmd: UserSyncCommand
) -> Optional[Awaitable[None]]:
user_sync_counter.inc()

if self._is_master:
await self._presence_handler.update_external_syncs_row(
return self._presence_handler.update_external_syncs_row(
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)
else:
return None

async def on_CLEAR_USER_SYNC(
def on_CLEAR_USER_SYNC(
self, conn: AbstractConnection, cmd: ClearUserSyncsCommand
):
) -> Optional[Awaitable[None]]:
if self._is_master:
await self._presence_handler.update_external_syncs_clear(cmd.instance_id)
return self._presence_handler.update_external_syncs_clear(cmd.instance_id)
else:
return None

async def on_FEDERATION_ACK(
self, conn: AbstractConnection, cmd: FederationAckCommand
):
def on_FEDERATION_ACK(self, conn: AbstractConnection, cmd: FederationAckCommand):
federation_ack_counter.inc()

if self._federation_sender:
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)

async def on_REMOVE_PUSHER(
def on_REMOVE_PUSHER(
self, conn: AbstractConnection, cmd: RemovePusherCommand
):
) -> Optional[Awaitable[None]]:
remove_pusher_counter.inc()

if self._is_master:
await self._store.delete_pusher_by_app_id_pushkey_user_id(
app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id
)
return self._handle_remove_pusher(cmd)
else:
return None

async def _handle_remove_pusher(self, cmd: RemovePusherCommand):
await self._store.delete_pusher_by_app_id_pushkey_user_id(
app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id
)

self._notifier.on_new_replication_data()
self._notifier.on_new_replication_data()

async def on_USER_IP(self, conn: AbstractConnection, cmd: UserIpCommand):
def on_USER_IP(
self, conn: AbstractConnection, cmd: UserIpCommand
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()

if self._is_master:
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
cmd.user_agent,
cmd.device_id,
cmd.last_seen,
)
return self._handle_user_ip(cmd)
else:
return None

async def _handle_user_ip(self, cmd: UserIpCommand):
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
cmd.user_agent,
cmd.device_id,
cmd.last_seen,
)

if self._server_notices_sender:
await self._server_notices_sender.on_user_ip(cmd.user_id)
assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)

async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
if cmd.instance_name == self._instance_name:
# Ignore RDATA that are just our own echoes
return
Expand All @@ -382,7 +401,7 @@ async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
# 2. so we don't race with getting a POSITION command and fetching
# missing RDATA.

await self._add_command_to_stream_queue(conn, cmd)
self._add_command_to_stream_queue(conn, cmd)

async def _process_rdata(
self, stream_name: str, conn: AbstractConnection, cmd: RdataCommand
Expand Down Expand Up @@ -459,14 +478,14 @@ async def on_rdata(
stream_name, instance_name, token, rows
)

async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand):
def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand):
if cmd.instance_name == self._instance_name:
# Ignore POSITION that are just our own echoes
return

logger.info("Handling '%s %s'", cmd.NAME, cmd.to_line())

await self._add_command_to_stream_queue(conn, cmd)
self._add_command_to_stream_queue(conn, cmd)

async def _process_position(
self, stream_name: str, conn: AbstractConnection, cmd: PositionCommand
Expand Down Expand Up @@ -526,9 +545,7 @@ async def _process_position(

self._streams_by_connection.setdefault(conn, set()).add(stream_name)

async def on_REMOTE_SERVER_UP(
self, conn: AbstractConnection, cmd: RemoteServerUpCommand
):
def on_REMOTE_SERVER_UP(self, conn: AbstractConnection, cmd: RemoteServerUpCommand):
""""Called when get a new REMOTE_SERVER_UP command."""
self._replication_data_handler.on_remote_server_up(cmd.data)

Expand Down
Loading

0 comments on commit b7c5713

Please sign in to comment.