Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make cleaning up pushers depend on the device_id instead of the token…
Browse files Browse the repository at this point in the history
…_id (#15280)

This makes it so that we rely on the `device_id` to delete pushers on logout,
instead of relying on the `access_token_id`. This ensures we're not removing
pushers on token refresh, and prepares for a world without access token IDs
(also known as the OIDC).

This actually runs the `set_device_id_for_pushers` background update, which
was forgotten in #13831.

Note that for backwards compatibility it still deletes pushers based on the
`access_token` until the background update finishes.
  • Loading branch information
sandhose authored Mar 24, 2023
1 parent 68a6717 commit 5b70f24
Show file tree
Hide file tree
Showing 15 changed files with 142 additions and 65 deletions.
1 change: 1 addition & 0 deletions changelog.d/15280.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the pushers rely on the `device_id` instead of the `access_token_id` for various operations.
6 changes: 5 additions & 1 deletion synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@
MediaRepositoryBackgroundUpdateStore,
)
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.pusher import (
PusherBackgroundUpdatesStore,
PusherWorkerStore,
)
from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
Expand Down Expand Up @@ -226,6 +229,7 @@ class Store(
AccountDataWorkerStore,
PushRuleStore,
PusherWorkerStore,
PusherBackgroundUpdatesStore,
PresenceBackgroundUpdateStore,
ReceiptsBackgroundUpdateStore,
RelationsWorkerStore,
Expand Down
8 changes: 6 additions & 2 deletions synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -1504,8 +1504,10 @@ async def delete_access_token(self, access_token: str) -> None:
)

# delete pushers associated with this access token
# XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
# background update completes.
if token.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
token.user_id, (token.token_id,)
)

Expand Down Expand Up @@ -1535,7 +1537,9 @@ async def delete_access_tokens_for_user(
)

# delete pushers associated with the access tokens
await self.hs.get_pusherpool().remove_pushers_by_access_token(
# XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
# background update completes.
await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
user_id, (token_id for _, token_id, _ in tokens_and_devices)
)

Expand Down
2 changes: 2 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
else:
raise

await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)

# Delete data specific to each device. Not optimised as it is not
# considered as part of a critical path.
for device_id in device_ids:
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,11 +1013,11 @@ async def _register_email_threepid(
user_tuple = await self.store.get_user_by_access_token(token)
# The token better still exist.
assert user_tuple
token_id = user_tuple.token_id
device_id = user_tuple.device_id

await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand Down
7 changes: 6 additions & 1 deletion synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class PusherConfig:

id: Optional[str]
user_name: str
access_token: Optional[int]

profile_tag: str
kind: str
app_id: str
Expand All @@ -119,6 +119,11 @@ class PusherConfig:
enabled: bool
device_id: Optional[str]

# XXX(quenting): The access_token is not persisted anymore for new pushers, but we
# keep it when reading from the database, so that we don't get stale pushers
# while the "set_device_id_for_pushers" background update is running.
access_token: Optional[int]

def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
return {
Expand Down
58 changes: 42 additions & 16 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
from synapse.types import JsonDict, RoomStreamToken
from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.util.async_helpers import concurrently_execute
from synapse.util.threepids import canonicalise_email

Expand Down Expand Up @@ -97,7 +97,6 @@ def start(self) -> None:
async def add_or_update_pusher(
self,
user_id: str,
access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
Expand Down Expand Up @@ -128,6 +127,22 @@ async def add_or_update_pusher(
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()

# Before we actually persist the pusher, we check if the user already has one
# for this app ID and pushkey. If so, we want to keep the access token and
# device ID in place, since this could be one device modifying
# (e.g. enabling/disabling) another device's pusher.
# XXX(quenting): Even though we're not persisting the access_token_id for new
# pushers anymore, we still need to copy existing access_token_ids over when
# updating a pusher, in case the "set_device_id_for_pushers" background update
# hasn't run yet.
access_token_id = None
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
device_id = existing_config.device_id
access_token_id = existing_config.access_token

# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
Expand All @@ -136,7 +151,6 @@ async def add_or_update_pusher(
PusherConfig(
id=None,
user_name=user_id,
access_token=access_token,
profile_tag=profile_tag,
kind=kind,
app_id=app_id,
Expand All @@ -151,23 +165,12 @@ async def add_or_update_pusher(
failing_since=None,
enabled=enabled,
device_id=device_id,
access_token=access_token_id,
)
)

# Before we actually persist the pusher, we check if the user already has one
# this app ID and pushkey. If so, we want to keep the access token and device ID
# in place, since this could be one device modifying (e.g. enabling/disabling)
# another device's pusher.
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
access_token = existing_config.access_token
device_id = existing_config.device_id

await self.store.add_pusher(
user_id=user_id,
access_token=access_token,
kind=kind,
app_id=app_id,
app_display_name=app_display_name,
Expand All @@ -180,6 +183,7 @@ async def add_or_update_pusher(
profile_tag=profile_tag,
enabled=enabled,
device_id=device_id,
access_token_id=access_token_id,
)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)

Expand All @@ -199,7 +203,7 @@ async def remove_pushers_by_app_id_and_pushkey_not_user(
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

async def remove_pushers_by_access_token(
async def remove_pushers_by_access_tokens(
self, user_id: str, access_tokens: Iterable[int]
) -> None:
"""Remove the pushers for a given user corresponding to a set of
Expand All @@ -209,6 +213,8 @@ async def remove_pushers_by_access_token(
user_id: user to remove pushers for
access_tokens: access token *ids* to remove pushers for
"""
# XXX(quenting): This is only needed until the "set_device_id_for_pushers"
# background update finishes
tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.access_token in tokens:
Expand All @@ -220,6 +226,26 @@ async def remove_pushers_by_access_token(
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

async def remove_pushers_by_devices(
self, user_id: str, devices: StrCollection
) -> None:
"""Remove the pushers for a given user corresponding to a set of devices
Args:
user_id: user to remove pushers for
devices: device IDs to remove pushers for
"""
device_ids = set(devices)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.device_id in device_ids:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p.app_id,
p.pushkey,
p.user_name,
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

def on_new_notifications(self, max_token: RoomStreamToken) -> None:
if not self.pushers:
# nothing to do here.
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ async def on_PUT(
):
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=None,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/client/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
try:
await self.pusher_pool.add_or_update_pusher(
user_id=user.to_string(),
access_token=requester.access_token_id,
kind=content["kind"],
app_id=content["app_id"],
app_display_name=content["app_display_name"],
Expand Down
40 changes: 31 additions & 9 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,19 +509,24 @@ def __init__(
async def _set_device_id_for_pushers(
self, progress: JsonDict, batch_size: int
) -> int:
"""Background update to populate the device_id column of the pushers table."""
"""
Background update to populate the device_id column and clear the access_token
column for the pushers table.
"""
last_pusher_id = progress.get("pusher_id", 0)

def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
txn.execute(
"""
SELECT p.id, at.device_id
SELECT
p.id AS pusher_id,
p.device_id AS pusher_device_id,
at.device_id AS token_device_id
FROM pushers AS p
INNER JOIN access_tokens AS at
LEFT JOIN access_tokens AS at
ON p.access_token = at.id
WHERE
p.access_token IS NOT NULL
AND at.device_id IS NOT NULL
AND p.id > ?
ORDER BY p.id
LIMIT ?
Expand All @@ -533,13 +538,27 @@ def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
if len(rows) == 0:
return 0

# The reason we're clearing the access_token column here is a bit subtle.
# When a user logs out, we:
# (1) delete the access token
# (2) delete the device
#
# Ideally, we would delete the pushers only via its link to the device
# during (2), but since this background update might not have fully run yet,
# we're still deleting the pushers via the access token during (1).
self.db_pool.simple_update_many_txn(
txn=txn,
table="pushers",
key_names=("id",),
key_values=[(row["id"],) for row in rows],
value_names=("device_id",),
value_values=[(row["device_id"],) for row in rows],
key_values=[(row["pusher_id"],) for row in rows],
value_names=("device_id", "access_token"),
# If there was already a device_id on the pusher, we only want to clear
# the access_token column, so we keep the existing device_id. Otherwise,
# we set the device_id we got from joining the access_tokens table.
value_values=[
(row["pusher_device_id"] or row["token_device_id"], None)
for row in rows
],
)

self.db_pool.updates._background_update_progress_txn(
Expand Down Expand Up @@ -568,7 +587,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
async def add_pusher(
self,
user_id: str,
access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
Expand All @@ -581,13 +599,13 @@ async def add_pusher(
profile_tag: str = "",
enabled: bool = True,
device_id: Optional[str] = None,
access_token_id: Optional[int] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
values={
"access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
Expand All @@ -599,6 +617,10 @@ async def add_pusher(
"id": stream_id,
"enabled": enabled,
"device_id": device_id,
# XXX(quenting): We're only really persisting the access token ID
# when updating an existing pusher. This is in case the
# 'set_device_id_for_pushers' background update hasn't finished yet.
"access_token": access_token_id,
},
desc="add_pusher",
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Triggers the background update to set the device_id for pushers
-- that don't have one, and clear the access_token column.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7402, 'set_device_id_for_pushers', '{}');
6 changes: 3 additions & 3 deletions tests/push/test_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.hs.get_datastores().main.get_user_by_access_token(self.access_token)
)
assert user_tuple is not None
self.token_id = user_tuple.token_id
self.device_id = user_tuple.device_id

# We need to add email to account before we can create a pusher.
self.get_success(
Expand All @@ -117,7 +117,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
pusher = self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
access_token=self.token_id,
device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand All @@ -141,7 +141,7 @@ def test_need_validated_email(self) -> None:
self.get_success_or_raise(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
access_token=self.token_id,
device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand Down
Loading

0 comments on commit 5b70f24

Please sign in to comment.