Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow use of the /filter Client-Server APIs on workers. #15134

Merged
merged 7 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15134.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow use of the `/filter` Client-Server APIs on workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to update:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes this is annoying, see #12139.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 change: 1 addition & 0 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
"^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
"^/_matrix/client/v1/rooms/.*/timestamp_to_event$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/search",
"^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
Expand Down
1 change: 1 addition & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ information.
^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$
^/_matrix/client/v1/rooms/.*/timestamp_to_event$
^/_matrix/client/(api/v1|r0|v3|unstable)/search$
^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Just starting a thread somewhere).

The REST servlet to add a filter:

  1. Checks the filter JSON against what's in the database and if an exact match is found it returns the same ID (i.e. you cannot have duplicate filters w/ different IDs).
  2. Otherwise it, finds the maximum current filter ID and increments by 1.
  3. Inserts the new filter.

I think this change might break step 1 -- if we have two requests to insert a filter, they can both race and return no known filter in step 1, then both insert the filter with different IDs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Note the only index is UNIQUE across (user_id, filter_id).)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were almost right (I think) that there was a phantom write here. But funnily enough, step 1 won't be broken because step 2 will fix it. The reason I say this is:

  • we run transactions at the REPEATABLE READ isolation level by default and do not override the default for this txn
  • For your race condition to hold, Step 1 for both transactions needs to happen concurrently so neither sees the other's filter.
  • Both transactions will therefore see the same maximum filter ID in step 2 — because of REPEATABLE READ and step (1)
  • Both transactions will attempt to insert a filter with the same ID in step 3. One of them won't be able to.

When trying in psql, the second INSERT blocks until the first has committed, but then gives a unique key constraint violation. I don't think this is one of the errors we retry a transaction for, so maybe there is a problem here after all.

syn7=*> INSERT INTO user_filters (user_id, filter_id, filter_json) VALUES ('@user:syn7', 1, '{}');
ERROR:  duplicate key value violates unique constraint "user_filters_unique"
DETAIL:  Key (user_id, filter_id)=(@user:syn7, 1) already exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes good call with the isolation level we're using. 👍

I think we don't retry that, but I'm not sure. I guess using a sequence here would have been better? (Would that have solved our issues?)

Copy link
Contributor Author

@reivilibre reivilibre Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(to be fair, this exact same issue already existed if the user made two requests at once! A single worker can still run two transactions concurrently)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes using a SEQUENCE or a SERIAL field would have saved this effort. I guess the original author really liked the idea of a user's filters starting at 1 and then incrementing by 1, rather than having the same space of IDs shared by all users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way of thinking of this would have been randomly generating them, like we do in a few other spots. (Incrementing IDs are usually a API smell.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(what's wrong with incrementing IDs?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(what's wrong with incrementing IDs?)

If they're scoped by user it is likely fine. If they're not then it can (potentially) leak information about other users (or your service as a whole). It also makes it easier to guess IDs.


# Encryption requests
^/_matrix/client/(r0|v3|unstable)/keys/query$
Expand Down
3 changes: 1 addition & 2 deletions synapse/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ def register_servlets(client_resource: HttpServer, hs: "HomeServer") -> None:
if is_main_process:
logout.register_servlets(hs, client_resource)
sync.register_servlets(hs, client_resource)
if is_main_process:
filter.register_servlets(hs, client_resource)
filter.register_servlets(hs, client_resource)
account.register_servlets(hs, client_resource)
register.register_servlets(hs, client_resource)
if is_main_process:
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from .event_push_actions import EventPushActionsStore
from .events_bg_updates import EventsBackgroundUpdatesStore
from .events_forward_extremities import EventForwardExtremitiesStore
from .filtering import FilteringStore
from .filtering import FilteringWorkerStore
from .keys import KeyStore
from .lock import LockStore
from .media_repository import MediaRepositoryStore
Expand Down Expand Up @@ -99,7 +99,7 @@ class DataStore(
EventFederationStore,
MediaRepositoryStore,
RejectionsStore,
FilteringStore,
FilteringWorkerStore,
PusherStore,
PushRuleStore,
ApplicationServiceTransactionStore,
Expand Down
25 changes: 21 additions & 4 deletions synapse/storage/databases/main/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from canonicaljson import encode_canonical_json

from synapse.api.errors import Codes, SynapseError
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict
Expand Down Expand Up @@ -46,8 +46,6 @@ async def get_user_filter(

return db_to_json(def_json)


class FilteringStore(FilteringWorkerStore):
async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int:
def_json = encode_canonical_json(user_filter)

Expand Down Expand Up @@ -79,4 +77,23 @@ def _do_txn(txn: LoggingTransaction) -> int:

return filter_id

return await self.db_pool.runInteraction("add_user_filter", _do_txn)
attempts = 0
while True:
# Try a few times.
# This is technically needed if a user tries to create two filters at once,
# leading to two concurrent transactions.
# The failure case would be:
# - SELECT filter_id ... filter_json = ? → both transactions return no rows
# - SELECT MAX(filter_id) ... → both transactions return e.g. 5
# - INSERT INTO ... → both transactions insert filter_id = 6
# One of the transactions will commit. The other will get a unique key
# constraint violation error (IntegrityError). This is not the same as a
# serialisability violation, which would be automatically retried by
# `runInteraction`.
try:
return await self.db_pool.runInteraction("add_user_filter", _do_txn)
except self.db_pool.engine.module.IntegrityError:
attempts += 1

if attempts >= 5:
raise StoreError(500, "Couldn't generate a filter ID.")