-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix message duplication if something goes wrong after persisting the event #8476
Conversation
if token_id and txn_id: | ||
existing = await self.get_event_id_from_transaction_id( | ||
event.sender, token_id, txn_id | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if we want to try and batch these up a bit? The standard simple_select_many
doesn't work due to it only supporting one column. There are psycopg2 helper functions that might help us though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like something similar could be made which takes an iterable of tuples (or dicts)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I'm just not sure how much I want to try and do that for something that is going in an RC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
park it for now, optimise later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I fully understand the ramifications of this change, but I left a bunch of questions. Overall it seems OK.
@@ -130,6 +129,15 @@ def __init__(self, database: DatabasePool, db_conn, hs): | |||
db_conn, "events", "stream_ordering", step=-1 | |||
) | |||
|
|||
if not hs.config.worker.worker_app: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can probably be put on the background worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, yes but that work isn't in 1.21 :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right. 🤦 I'll update it when this gets merged forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if we're retargetting this to develop, seems like we can make this change!
if token_id and txn_id: | ||
existing = await self.get_event_id_from_transaction_id( | ||
event.sender, token_id, txn_id | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like something similar could be made which takes an iterable of tuples (or dicts)?
DELETE FROM event_txn_id | ||
WHERE inserted_ts < ? | ||
""" | ||
one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One day ago is essentially the largest you could get behind on federation and still not have duplicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only get transaction IDs from local clients, who I believe shouldn't retry if significant time has passed.
|
||
def _maybe_start_persisting(self, room_id: str): | ||
async def persisting_queue(item): | ||
with Measure(self._clock, "persist_events"): | ||
await self._persist_events( | ||
return await self._persist_events( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit confusing, but this seems to be the change that means the Deferred
returned via add_to_queue
gets called with a value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup
Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code looks reasonable, I don't have a good feel for:
- Any performance ramifications due to these changes.
- The failure modes of the current design.
-- A map of recent events persisted with transaction IDs. Used to deduplicate | ||
-- send event requests with the same transaction ID. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice to mention what these are expected to be deduplicated across (user, device, transaction).
I think most of the other tables we have use the device_id
, not the access token ID? Are these mostly synonymous?
token_id = getattr(event.internal_metadata, "token_id", None) | ||
txn_id = getattr(event.internal_metadata, "txn_id", None) | ||
if token_id and txn_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slightly feeling that it shouldn't be the storage layer's responsibility to do this digging, but ymmv
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeeeah, though this only gets called from persist_events
so we'd have to thread this through persist_events
if we wanted to move the logic out of storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Causes the deferreds returned by `add_to_queue` to resolve with: a | ||
dictionary of event ID to event ID we didn't persist as we already had | ||
another event persisted with the same TXN ID. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably needs to go on add_to_queue
, not here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some words to add_to_queue
that we pass through return values. I find it a bit tricky as the class is currently relatively generic, and so adding details about specific usage feels a bit weird (event if its only used in one place)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough, but this still needs to be translated into English :-p
Causes the deferreds returned by `add_to_queue` to resolve with: a | |
dictionary of event ID to event ID we didn't persist as we already had | |
another event persisted with the same TXN ID. | |
Once the events are persisted, the Deferreds returned by `add_to_queue` | |
will complete. Each Deferred resolves with a `Dict[str, str]`, where each entry | |
indicates an event that was not persisted due to it having the same txn ID as | |
an existing event, and maps from the ID of the dropped event, to the ID of | |
the existing event. |
if token_id and txn_id: | ||
existing = await self.get_event_id_from_transaction_id( | ||
event.sender, token_id, txn_id | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
park it for now, optimise later?
token_id = getattr(event.internal_metadata, "token_id", None) | ||
txn_id = getattr(event.internal_metadata, "txn_id", None) | ||
if token_id and txn_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
replaced_events = await self.main_store.get_already_persisted_events( | ||
(event for event, _ in events_and_contexts) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I might have paged this out, but can we be sure that events_and_contexts
doesn't itself contain duplicate txn ids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, hmm, I don't think it should happen because we linearize based on txn ID elsewhere, but we should add a check anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Synapse 1.22.0rc1 (2020-10-22) ============================== Features -------- - Add a configuration option for always using the "userinfo endpoint" for OpenID Connect. This fixes support for some identity providers, e.g. GitLab. Contributed by Benjamin Koch. ([\#7658](#7658)) - Add ability for `ThirdPartyEventRules` modules to query and manipulate whether a room is in the public rooms directory. ([\#8292](#8292), [\#8467](#8467)) - Add support for olm fallback keys ([MSC2732](matrix-org/matrix-spec-proposals#2732)). ([\#8312](#8312), [\#8501](#8501)) - Add support for running background tasks in a separate worker process. ([\#8369](#8369), [\#8458](#8458), [\#8489](#8489), [\#8513](#8513), [\#8544](#8544), [\#8599](#8599)) - Add support for device dehydration ([MSC2697](matrix-org/matrix-spec-proposals#2697)). ([\#8380](#8380)) - Add support for [MSC2409](matrix-org/matrix-spec-proposals#2409), which allows sending typing, read receipts, and presence events to appservices. ([\#8437](#8437), [\#8590](#8590)) - Change default room version to "6", per [MSC2788](matrix-org/matrix-spec-proposals#2788). ([\#8461](#8461)) - Add the ability to send non-membership events into a room via the `ModuleApi`. ([\#8479](#8479)) - Increase default upload size limit from 10M to 50M. Contributed by @Akkowicz. ([\#8502](#8502)) - Add support for modifying event content in `ThirdPartyRules` modules. ([\#8535](#8535), [\#8564](#8564)) Bugfixes -------- - Fix a longstanding bug where invalid ignored users in account data could break clients. ([\#8454](#8454)) - Fix a bug where backfilling a room with an event that was missing the `redacts` field would break. ([\#8457](#8457)) - Don't attempt to respond to some requests if the client has already disconnected. ([\#8465](#8465)) - Fix message duplication if something goes wrong after persisting the event. ([\#8476](#8476)) - Fix incremental sync returning an incorrect `prev_batch` token in timeline section, which when used to paginate returned events that were included in the incremental sync. Broken since v0.16.0. ([\#8486](#8486)) - Expose the `uk.half-shot.msc2778.login.application_service` to clients from the login API. This feature was added in v1.21.0, but was not exposed as a potential login flow. ([\#8504](#8504)) - Fix error code for `/profile/{userId}/displayname` to be `M_BAD_JSON`. ([\#8517](#8517)) - Fix a bug introduced in v1.7.0 that could cause Synapse to insert values from non-state `m.room.retention` events into the `room_retention` database table. ([\#8527](#8527)) - Fix not sending events over federation when using sharded event writers. ([\#8536](#8536)) - Fix a long standing bug where email notifications for encrypted messages were blank. ([\#8545](#8545)) - Fix increase in the number of `There was no active span...` errors logged when using OpenTracing. ([\#8567](#8567)) - Fix a bug that prevented errors encountered during execution of the `synapse_port_db` from being correctly printed. ([\#8585](#8585)) - Fix appservice transactions to only include a maximum of 100 persistent and 100 ephemeral events. ([\#8606](#8606)) Updates to the Docker image --------------------------- - Added multi-arch support (arm64,arm/v7) for the docker images. Contributed by @maquis196. ([\#7921](#7921)) - Add support for passing commandline args to the synapse process. Contributed by @samuel-p. ([\#8390](#8390)) Improved Documentation ---------------------- - Update the directions for using the manhole with coroutines. ([\#8462](#8462)) - Improve readme by adding new shield.io badges. ([\#8493](#8493)) - Added note about docker in manhole.md regarding which ip address to bind to. Contributed by @maquis196. ([\#8526](#8526)) - Document the new behaviour of the `allowed_lifetime_min` and `allowed_lifetime_max` settings in the room retention configuration. ([\#8529](#8529)) Deprecations and Removals ------------------------- - Drop unused `device_max_stream_id` table. ([\#8589](#8589)) Internal Changes ---------------- - Check for unreachable code with mypy. ([\#8432](#8432)) - Add unit test for event persister sharding. ([\#8433](#8433)) - Allow events to be sent to clients sooner when using sharded event persisters. ([\#8439](#8439), [\#8488](#8488), [\#8496](#8496), [\#8499](#8499)) - Configure `public_baseurl` when using demo scripts. ([\#8443](#8443)) - Add SQL logging on queries that happen during startup. ([\#8448](#8448)) - Speed up unit tests when using PostgreSQL. ([\#8450](#8450)) - Remove redundant database loads of stream_ordering for events we already have. ([\#8452](#8452)) - Reduce inconsistencies between codepaths for membership and non-membership events. ([\#8463](#8463)) - Combine `SpamCheckerApi` with the more generic `ModuleApi`. ([\#8464](#8464)) - Additional testing for `ThirdPartyEventRules`. ([\#8468](#8468)) - Add `-d` option to `./scripts-dev/lint.sh` to lint files that have changed since the last git commit. ([\#8472](#8472)) - Unblacklist some sytests. ([\#8474](#8474)) - Include the log level in the phone home stats. ([\#8477](#8477)) - Remove outdated sphinx documentation, scripts and configuration. ([\#8480](#8480)) - Clarify error message when plugin config parsers raise an error. ([\#8492](#8492)) - Remove the deprecated `Handlers` object. ([\#8494](#8494)) - Fix a threadsafety bug in unit tests. ([\#8497](#8497)) - Add user agent to user_daily_visits table. ([\#8503](#8503)) - Add type hints to various parts of the code base. ([\#8407](#8407), [\#8505](#8505), [\#8507](#8507), [\#8547](#8547), [\#8562](#8562), [\#8609](#8609)) - Remove unused code from the test framework. ([\#8514](#8514)) - Apply some internal fixes to the `HomeServer` class to make its code more idiomatic and statically-verifiable. ([\#8515](#8515)) - Factor out common code between `RoomMemberHandler._locally_reject_invite` and `EventCreationHandler.create_event`. ([\#8537](#8537)) - Improve database performance by executing more queries without starting transactions. ([\#8542](#8542)) - Rename `Cache` to `DeferredCache`, to better reflect its purpose. ([\#8548](#8548)) - Move metric registration code down into `LruCache`. ([\#8561](#8561), [\#8591](#8591)) - Replace `DeferredCache` with the lighter-weight `LruCache` where possible. ([\#8563](#8563)) - Add virtualenv-generated folders to `.gitignore`. ([\#8566](#8566)) - Add `get_immediate` method to `DeferredCache`. ([\#8568](#8568)) - Fix mypy not properly checking across the codebase, additionally, fix a typing assertion error in `handlers/auth.py`. ([\#8569](#8569)) - Fix `synmark` benchmark runner. ([\#8571](#8571)) - Modify `DeferredCache.get()` to return `Deferred`s instead of `ObservableDeferred`s. ([\#8572](#8572)) - Adjust a protocol-type definition to fit `sqlite3` assertions. ([\#8577](#8577)) - Support macOS on the `synmark` benchmark runner. ([\#8578](#8578)) - Update `mypy` static type checker to 0.790. ([\#8583](#8583), [\#8600](#8600)) - Re-organize the structured logging code to separate the TCP transport handling from the JSON formatting. ([\#8587](#8587)) - Remove extraneous unittest logging decorators from unit tests. ([\#8592](#8592)) - Minor optimisations in caching code. ([\#8593](#8593), [\#8594](#8594))
Synapse 1.22.0 (2020-10-27) =========================== No significant changes. Synapse 1.22.0rc2 (2020-10-26) ============================== Bugfixes -------- - Fix bugs where ephemeral events were not sent to appservices. Broke in v1.22.0rc1. ([\#8648](matrix-org/synapse#8648), [\#8656](matrix-org/synapse#8656)) - Fix `user_daily_visits` table to not have duplicate rows per user/device due to multiple user agents. Broke in v1.22.0rc1. ([\#8654](matrix-org/synapse#8654)) Synapse 1.22.0rc1 (2020-10-22) ============================== Features -------- - Add a configuration option for always using the "userinfo endpoint" for OpenID Connect. This fixes support for some identity providers, e.g. GitLab. Contributed by Benjamin Koch. ([\#7658](matrix-org/synapse#7658)) - Add ability for `ThirdPartyEventRules` modules to query and manipulate whether a room is in the public rooms directory. ([\#8292](matrix-org/synapse#8292), [\#8467](matrix-org/synapse#8467)) - Add support for olm fallback keys ([MSC2732](matrix-org/matrix-spec-proposals#2732)). ([\#8312](matrix-org/synapse#8312), [\#8501](matrix-org/synapse#8501)) - Add support for running background tasks in a separate worker process. ([\#8369](matrix-org/synapse#8369), [\#8458](matrix-org/synapse#8458), [\#8489](matrix-org/synapse#8489), [\#8513](matrix-org/synapse#8513), [\#8544](matrix-org/synapse#8544), [\#8599](matrix-org/synapse#8599)) - Add support for device dehydration ([MSC2697](matrix-org/matrix-spec-proposals#2697)). ([\#8380](matrix-org/synapse#8380)) - Add support for [MSC2409](matrix-org/matrix-spec-proposals#2409), which allows sending typing, read receipts, and presence events to appservices. ([\#8437](matrix-org/synapse#8437), [\#8590](matrix-org/synapse#8590)) - Change default room version to "6", per [MSC2788](matrix-org/matrix-spec-proposals#2788). ([\#8461](matrix-org/synapse#8461)) - Add the ability to send non-membership events into a room via the `ModuleApi`. ([\#8479](matrix-org/synapse#8479)) - Increase default upload size limit from 10M to 50M. Contributed by @Akkowicz. ([\#8502](matrix-org/synapse#8502)) - Add support for modifying event content in `ThirdPartyRules` modules. ([\#8535](matrix-org/synapse#8535), [\#8564](matrix-org/synapse#8564)) Bugfixes -------- - Fix a longstanding bug where invalid ignored users in account data could break clients. ([\#8454](matrix-org/synapse#8454)) - Fix a bug where backfilling a room with an event that was missing the `redacts` field would break. ([\#8457](matrix-org/synapse#8457)) - Don't attempt to respond to some requests if the client has already disconnected. ([\#8465](matrix-org/synapse#8465)) - Fix message duplication if something goes wrong after persisting the event. ([\#8476](matrix-org/synapse#8476)) - Fix incremental sync returning an incorrect `prev_batch` token in timeline section, which when used to paginate returned events that were included in the incremental sync. Broken since v0.16.0. ([\#8486](matrix-org/synapse#8486)) - Expose the `uk.half-shot.msc2778.login.application_service` to clients from the login API. This feature was added in v1.21.0, but was not exposed as a potential login flow. ([\#8504](matrix-org/synapse#8504)) - Fix error code for `/profile/{userId}/displayname` to be `M_BAD_JSON`. ([\#8517](matrix-org/synapse#8517)) - Fix a bug introduced in v1.7.0 that could cause Synapse to insert values from non-state `m.room.retention` events into the `room_retention` database table. ([\#8527](matrix-org/synapse#8527)) - Fix not sending events over federation when using sharded event writers. ([\#8536](matrix-org/synapse#8536)) - Fix a long standing bug where email notifications for encrypted messages were blank. ([\#8545](matrix-org/synapse#8545)) - Fix increase in the number of `There was no active span...` errors logged when using OpenTracing. ([\#8567](matrix-org/synapse#8567)) - Fix a bug that prevented errors encountered during execution of the `synapse_port_db` from being correctly printed. ([\#8585](matrix-org/synapse#8585)) - Fix appservice transactions to only include a maximum of 100 persistent and 100 ephemeral events. ([\#8606](matrix-org/synapse#8606)) Updates to the Docker image --------------------------- - Added multi-arch support (arm64,arm/v7) for the docker images. Contributed by @maquis196. ([\#7921](matrix-org/synapse#7921)) - Add support for passing commandline args to the synapse process. Contributed by @samuel-p. ([\#8390](matrix-org/synapse#8390)) Improved Documentation ---------------------- - Update the directions for using the manhole with coroutines. ([\#8462](matrix-org/synapse#8462)) - Improve readme by adding new shield.io badges. ([\#8493](matrix-org/synapse#8493)) - Added note about docker in manhole.md regarding which ip address to bind to. Contributed by @maquis196. ([\#8526](matrix-org/synapse#8526)) - Document the new behaviour of the `allowed_lifetime_min` and `allowed_lifetime_max` settings in the room retention configuration. ([\#8529](matrix-org/synapse#8529)) Deprecations and Removals ------------------------- - Drop unused `device_max_stream_id` table. ([\#8589](matrix-org/synapse#8589)) Internal Changes ---------------- - Check for unreachable code with mypy. ([\#8432](matrix-org/synapse#8432)) - Add unit test for event persister sharding. ([\#8433](matrix-org/synapse#8433)) - Allow events to be sent to clients sooner when using sharded event persisters. ([\#8439](matrix-org/synapse#8439), [\#8488](matrix-org/synapse#8488), [\#8496](matrix-org/synapse#8496), [\#8499](matrix-org/synapse#8499)) - Configure `public_baseurl` when using demo scripts. ([\#8443](matrix-org/synapse#8443)) - Add SQL logging on queries that happen during startup. ([\#8448](matrix-org/synapse#8448)) - Speed up unit tests when using PostgreSQL. ([\#8450](matrix-org/synapse#8450)) - Remove redundant database loads of stream_ordering for events we already have. ([\#8452](matrix-org/synapse#8452)) - Reduce inconsistencies between codepaths for membership and non-membership events. ([\#8463](matrix-org/synapse#8463)) - Combine `SpamCheckerApi` with the more generic `ModuleApi`. ([\#8464](matrix-org/synapse#8464)) - Additional testing for `ThirdPartyEventRules`. ([\#8468](matrix-org/synapse#8468)) - Add `-d` option to `./scripts-dev/lint.sh` to lint files that have changed since the last git commit. ([\#8472](matrix-org/synapse#8472)) - Unblacklist some sytests. ([\#8474](matrix-org/synapse#8474)) - Include the log level in the phone home stats. ([\#8477](matrix-org/synapse#8477)) - Remove outdated sphinx documentation, scripts and configuration. ([\#8480](matrix-org/synapse#8480)) - Clarify error message when plugin config parsers raise an error. ([\#8492](matrix-org/synapse#8492)) - Remove the deprecated `Handlers` object. ([\#8494](matrix-org/synapse#8494)) - Fix a threadsafety bug in unit tests. ([\#8497](matrix-org/synapse#8497)) - Add user agent to user_daily_visits table. ([\#8503](matrix-org/synapse#8503)) - Add type hints to various parts of the code base. ([\#8407](matrix-org/synapse#8407), [\#8505](matrix-org/synapse#8505), [\#8507](matrix-org/synapse#8507), [\#8547](matrix-org/synapse#8547), [\#8562](matrix-org/synapse#8562), [\#8609](matrix-org/synapse#8609)) - Remove unused code from the test framework. ([\#8514](matrix-org/synapse#8514)) - Apply some internal fixes to the `HomeServer` class to make its code more idiomatic and statically-verifiable. ([\#8515](matrix-org/synapse#8515)) - Factor out common code between `RoomMemberHandler._locally_reject_invite` and `EventCreationHandler.create_event`. ([\#8537](matrix-org/synapse#8537)) - Improve database performance by executing more queries without starting transactions. ([\#8542](matrix-org/synapse#8542)) - Rename `Cache` to `DeferredCache`, to better reflect its purpose. ([\#8548](matrix-org/synapse#8548)) - Move metric registration code down into `LruCache`. ([\#8561](matrix-org/synapse#8561), [\#8591](matrix-org/synapse#8591)) - Replace `DeferredCache` with the lighter-weight `LruCache` where possible. ([\#8563](matrix-org/synapse#8563)) - Add virtualenv-generated folders to `.gitignore`. ([\#8566](matrix-org/synapse#8566)) - Add `get_immediate` method to `DeferredCache`. ([\#8568](matrix-org/synapse#8568)) - Fix mypy not properly checking across the codebase, additionally, fix a typing assertion error in `handlers/auth.py`. ([\#8569](matrix-org/synapse#8569)) - Fix `synmark` benchmark runner. ([\#8571](matrix-org/synapse#8571)) - Modify `DeferredCache.get()` to return `Deferred`s instead of `ObservableDeferred`s. ([\#8572](matrix-org/synapse#8572)) - Adjust a protocol-type definition to fit `sqlite3` assertions. ([\#8577](matrix-org/synapse#8577)) - Support macOS on the `synmark` benchmark runner. ([\#8578](matrix-org/synapse#8578)) - Update `mypy` static type checker to 0.790. ([\#8583](matrix-org/synapse#8583), [\#8600](matrix-org/synapse#8600)) - Re-organize the structured logging code to separate the TCP transport handling from the JSON formatting. ([\#8587](matrix-org/synapse#8587)) - Remove extraneous unittest logging decorators from unit tests. ([\#8592](matrix-org/synapse#8592)) - Minor optimisations in caching code. ([\#8593](matrix-org/synapse#8593), [\#8594](matrix-org/synapse#8594))
Should fix #3365.