Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move update_client_ip background job from the main process to the background worker. #12251

Merged
merged 13 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12251.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Offload the `update_client_ip` background job from the main process to the background worker, when using Redis-based replication.
2 changes: 0 additions & 2 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
Expand Down Expand Up @@ -61,7 +60,6 @@ class AdminCmdSlavedStore(
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore,
BaseSlavedStore,
RoomWorkerStore,
):
Expand Down
2 changes: 0 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
Expand Down Expand Up @@ -247,7 +246,6 @@ class GenericWorkerSlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
Expand Down
59 changes: 0 additions & 59 deletions synapse/replication/slave/storage/client_ips.py

This file was deleted.

8 changes: 7 additions & 1 deletion synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def __init__(
access_token: str,
ip: str,
user_agent: str,
device_id: str,
device_id: Optional[str],
last_seen: int,
):
self.user_id = user_id
Expand Down Expand Up @@ -389,6 +389,12 @@ def to_line(self) -> str:
)
)

def __repr__(self) -> str:
return (
f"UserIpCommand({self.user_id!r}, .., {self.ip!r}, "
f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
)


class RemoteServerUpCommand(_SimpleCommand):
"""Sent when a worker has detected that a remote server is no longer
Expand Down
30 changes: 25 additions & 5 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ def __init__(self, hs: "HomeServer"):
if self._is_master:
self._server_notices_sender = hs.get_server_notices_sender()

if hs.config.redis.redis_enabled:
# If we're using Redis, it's the background worker that should
# receive USER_IP commands and store the relevant client IPs.
self._should_insert_client_ips = hs.config.worker.run_background_tasks
else:
# If we're NOT using Redis, this must be handled by the master
self._should_insert_client_ips = hs.get_instance_name() == "master"

def _add_command_to_stream_queue(
self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
) -> None:
Expand Down Expand Up @@ -401,12 +409,27 @@ def on_USER_IP(
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()

if self._is_master:
if self._is_master or self._should_insert_client_ips:
return self._handle_user_ip(cmd)
else:
return None

async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
"""
Handles a User IP, branching depending on whether we are the main process
and/or the background worker.
"""
if self._is_master:
await self._handle_user_ip_as_master(cmd)

if self._should_insert_client_ips:
await self._handle_user_ip_as_background_worker(cmd)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be clearer to inline _handle_user_ip into on_USER_IP? Otherwise its a bit confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only concern with that is that it will mean that uninvolved workers end up creating and scheduling coroutines as background processes as well, whereas they didn't before.
I don't know how much that worries us, but since the code was already structured this way for some reason, I didn't want to remove it without understanding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, I'm not talking about a functional change here? _handle_user_ip is only called by on_USER_IP?

What's confusing me really is that the self._is_master or self._should_insert_client_ips a) looks odd by itself, and b) is entirely redundant given _handle_user_ip then does the exact same checks. You may as well just always call self._handle_user_ip in on_USER_IP, at which point having it as a separate function seems a bit silly?

Or am I missing something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_USER_IP returns either None or an awaitable. If it's an awaitable, it goes through extra stuff to schedule it as a background process; if it's just None then nothing happens.

Notably, the function can't (easily/nicely) be inlined and still preserve this behaviour of returning None if there's nothing to do, as I'd have to make on_USER_IP an async def in order to run potentially two coroutines back-to-back.

It may not be a big deal, but I wasn't sure and since replication overhead is pretty sensitive, I was a bit uncomfortable taking a leap of faith.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH. Right, with you now. How obscure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and it took me forever where to find the code that calls on_%s because the name is dynamically constructed and I forgot that we like %s-formatting and was searching for "on_" :')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment in on_USER_IP about the whole awaitable thing please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — I also inlined the two specific handle_user_ip functions, since looking at them now, they're so short it wasn't buying any clarity.


async def _handle_user_ip_as_master(self, cmd: UserIpCommand) -> None:
assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)

async def _handle_user_ip_as_background_worker(self, cmd: UserIpCommand) -> None:
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
Expand All @@ -416,9 +439,6 @@ async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
cmd.last_seen,
)

assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)

def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
if cmd.instance_name == self._instance_name:
# Ignore RDATA that are just our own echoes
Expand Down Expand Up @@ -698,7 +718,7 @@ def send_user_ip(
access_token: str,
ip: str,
user_agent: str,
device_id: str,
device_id: Optional[str],
last_seen: int,
) -> None:
"""Tell the master that the user made a request."""
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
from .cache import CacheInvalidationWorkerStore
from .censor_events import CensorEventsStore
from .client_ips import ClientIpStore
from .client_ips import ClientIpWorkerStore
from .deviceinbox import DeviceInboxStore
from .devices import DeviceStore
from .directory import DirectoryStore
Expand All @@ -49,7 +49,7 @@
from .lock import LockStore
from .media_repository import MediaRepositoryStore
from .metrics import ServerMetricsStore
from .monthly_active_users import MonthlyActiveUsersStore
from .monthly_active_users import MonthlyActiveUsersWorkerStore
from .openid import OpenIdStore
from .presence import PresenceStore
from .profile import ProfileStore
Expand Down Expand Up @@ -112,13 +112,13 @@ class DataStore(
AccountDataStore,
EventPushActionsStore,
OpenIdStore,
ClientIpStore,
ClientIpWorkerStore,
DeviceStore,
DeviceInboxStore,
UserDirectoryStore,
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
MonthlyActiveUsersWorkerStore,
StatsStore,
RelationsStore,
CensorEventsStore,
Expand Down
101 changes: 65 additions & 36 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
LoggingTransaction,
make_tuple_comparison_clause,
)
from synapse.storage.databases.main.monthly_active_users import MonthlyActiveUsersStore
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.types import JsonDict, UserID
from synapse.util.caches.lrucache import LruCache

Expand Down Expand Up @@ -397,7 +399,7 @@ def _devices_last_seen_update_txn(txn: LoggingTransaction) -> int:
return updated


class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorkerStore):
def __init__(
self,
database: DatabasePool,
Expand All @@ -406,11 +408,40 @@ def __init__(
):
super().__init__(database, db_conn, hs)

if hs.config.redis.redis_enabled:
# If we're using Redis, we can shift this update process off to
# the background worker
self._update_on_this_worker = hs.config.worker.run_background_tasks
else:
# If we're NOT using Redis, this must be handled by the master
self._update_on_this_worker = hs.get_instance_name() == "master"

self.user_ips_max_age = hs.config.server.user_ips_max_age

# (user_id, access_token, ip,) -> last_seen
self.client_ip_last_seen = LruCache[Tuple[str, str, str], int](
cache_name="client_ip_last_seen", max_size=50000
)

if hs.config.worker.run_background_tasks and self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

if self._update_on_this_worker:
# This is the designated worker that can write to the client IP
# tables.

# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update: Dict[
Tuple[str, str, str], Tuple[str, Optional[str], int]
] = {}

self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)

@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self) -> None:
"""Removes entries in user IPs older than the configured period."""
Expand Down Expand Up @@ -456,7 +487,7 @@ def _prune_old_user_ips_txn(txn: LoggingTransaction) -> None:
"_prune_old_user_ips", _prune_old_user_ips_txn
)

async def get_last_client_ip_by_device(
async def _get_last_client_ip_by_device_from_database(
self, user_id: str, device_id: Optional[str]
) -> Dict[Tuple[str, str], DeviceLastConnectionInfo]:
"""For each device_id listed, give the user_ip it was last seen on.
Expand Down Expand Up @@ -487,7 +518,7 @@ async def get_last_client_ip_by_device(

return {(d["user_id"], d["device_id"]): d for d in res}

async def get_user_ip_and_agents(
async def _get_user_ip_and_agents_from_database(
self, user: UserID, since_ts: int = 0
) -> List[LastConnectionInfo]:
"""Fetch the IPs and user agents for a user since the given timestamp.
Expand Down Expand Up @@ -539,34 +570,6 @@ def get_recent(txn: LoggingTransaction) -> List[Tuple[str, str, str, int]]:
for access_token, ip, user_agent, last_seen in rows
]


class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):

# (user_id, access_token, ip,) -> last_seen
self.client_ip_last_seen = LruCache[Tuple[str, str, str], int](
cache_name="client_ip_last_seen", max_size=50000
)

super().__init__(database, db_conn, hs)

# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update: Dict[
Tuple[str, str, str], Tuple[str, Optional[str], int]
] = {}

self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)

async def insert_client_ip(
self,
user_id: str,
Expand All @@ -584,17 +587,27 @@ async def insert_client_ip(
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
await self.populate_monthly_active_users(user_id)

# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return

self.client_ip_last_seen.set(key, now)

self._batch_row_update[key] = (user_agent, device_id, now)
if self._update_on_this_worker:
await self.populate_monthly_active_users(user_id)
self._batch_row_update[key] = (user_agent, device_id, now)
else:
# We are not the designated writer-worker, so stream over replication
self.hs.get_replication_command_handler().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)

@wrap_as_background_process("update_client_ips")
async def _update_client_ips_batch(self) -> None:
assert (
self._update_on_this_worker
), "This worker is not designated to update client IPs"

# If the DB pool has already terminated, don't try updating
if not self.db_pool.is_running():
Expand All @@ -612,6 +625,10 @@ def _update_client_ips_batch_txn(
txn: LoggingTransaction,
to_update: Mapping[Tuple[str, str, str], Tuple[str, Optional[str], int]],
) -> None:
assert (
self._update_on_this_worker
), "This worker is not designated to update client IPs"

if "user_ips" in self.db_pool._unsafe_to_upsert_tables or (
not self.database_engine.can_native_upsert
):
Expand Down Expand Up @@ -662,7 +679,12 @@ async def get_last_client_ip_by_device(
A dictionary mapping a tuple of (user_id, device_id) to dicts, with
keys giving the column names from the devices table.
"""
ret = await super().get_last_client_ip_by_device(user_id, device_id)
ret = await self._get_last_client_ip_by_device_from_database(user_id, device_id)

if not self._update_on_this_worker:
# Only the writing-worker has additional in-memory data to enhance
# the result
return ret

# Update what is retrieved from the database with data which is pending
# insertion, as if it has already been stored in the database.
Expand Down Expand Up @@ -707,9 +729,16 @@ async def get_user_ip_and_agents(
Only the latest user agent for each access token and IP address combination
is available.
"""
rows_from_db = await self._get_user_ip_and_agents_from_database(user, since_ts)

if not self._update_on_this_worker:
# Only the writing-worker has additional in-memory data to enhance
# the result
return rows_from_db

results: Dict[Tuple[str, str], LastConnectionInfo] = {
(connection["access_token"], connection["ip"]): connection
for connection in await super().get_user_ip_and_agents(user, since_ts)
for connection in rows_from_db
}

# Overlay data that is pending insertion on top of the results from the
Expand Down
Loading