From 855af069a494f826ef941d722c811287b3fc4a8c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 18:56:18 +0000 Subject: [PATCH 01/14] Fix instantiation of message retention purge jobs When figuring out which topological token to start a purge job at, we need to do the following: 1. Figure out a timestamp before which events will be purged 2. Select the first stream ordering after that timestamp 3. Select info about the first event after that stream ordering 4. Build a topological token from that info In some situations (e.g. quiet rooms with a short max_lifetime), there might not be an event after the stream ordering at step 3, therefore we abort the purge with the error `No event found`. To mitigate that, this patch fetches the first event _before_ the stream ordering, instead of after. --- synapse/handlers/pagination.py | 2 +- synapse/storage/data_stores/main/stream.py | 59 +++++++++++++++++----- 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 00a6afc963d4..3ee6a091c528 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -156,7 +156,7 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms): stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) - r = yield self.store.get_room_event_after_stream_ordering( + r = yield self.store.get_room_event_before_stream_ordering( room_id, stream_ordering, ) if not r: diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 140da8dad686..223ce7fedbda 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -536,20 +536,55 @@ def get_room_event_after_stream_ordering(self, room_id, stream_ordering): Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ + return self.db.runInteraction( + "get_room_event_after_stream_ordering", + self.get_room_event_around_stream_ordering_txn, + room_id, stream_ordering, "f", + ) - def _f(txn): - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering >= ?" - " AND NOT outlier" - " ORDER BY stream_ordering" - " LIMIT 1" - ) - txn.execute(sql, (room_id, stream_ordering)) - return txn.fetchone() + def get_room_event_before_stream_ordering(self, room_id, stream_ordering): + """Gets details of the first event in a room at or before a stream ordering + + Args: + room_id (str): + stream_ordering (int): + + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + return self.db.runInteraction( + "get_room_event_before_stream_ordering", + self.get_room_event_around_stream_ordering_txn, + room_id, stream_ordering, "f", + ) + + def get_room_event_around_stream_ordering_txn( + self, txn, room_id, stream_ordering, dir="f" + ): + """Gets details of the first event in a room at or either after or before a + stream ordering, depending on the provided direction. + + Args: + room_id (str): + stream_ordering (int): + dir (str): Direction in which we're looking towards in the room's history, + either "f" (forward) or "b" (backward). - return self.db.runInteraction("get_room_event_after_stream_ordering", _f) + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering %s ?" + " AND NOT outlier" + " ORDER BY stream_ordering" + " LIMIT 1" + ) % ("<=" if dir == "b" else ">=",) + txn.execute(sql, (room_id, stream_ordering)) + return txn.fetchone() @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None): From 83635882379ecddb1509ea3d071eefdedefb647e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 19:13:22 +0000 Subject: [PATCH 02/14] Fix typo --- synapse/storage/data_stores/main/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 223ce7fedbda..9fa5e1f203b1 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -556,7 +556,7 @@ def get_room_event_before_stream_ordering(self, room_id, stream_ordering): return self.db.runInteraction( "get_room_event_before_stream_ordering", self.get_room_event_around_stream_ordering_txn, - room_id, stream_ordering, "f", + room_id, stream_ordering, "b", ) def get_room_event_around_stream_ordering_txn( From 066b9f52b80c172eec6074ca01fb24670200fd80 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 19:32:47 +0000 Subject: [PATCH 03/14] Correctly order when selecting before stream ordering --- synapse/storage/data_stores/main/stream.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 9fa5e1f203b1..451f38296bef 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -580,9 +580,12 @@ def get_room_event_around_stream_ordering_txn( " FROM events" " WHERE room_id = ? AND stream_ordering %s ?" " AND NOT outlier" - " ORDER BY stream_ordering" + " ORDER BY stream_ordering %s" " LIMIT 1" - ) % ("<=" if dir == "b" else ">=",) + ) % ( + "<=" if dir == "b" else ">=", + "DESC" if dir == "b" else "ASC", + ) txn.execute(sql, (room_id, stream_ordering)) return txn.fetchone() From 914e73cdd9053d6fd050e5ad04910db74a7b5cd9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 19:36:19 +0000 Subject: [PATCH 04/14] Changelog --- changelog.d/6713.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6713.bugfix diff --git a/changelog.d/6713.bugfix b/changelog.d/6713.bugfix new file mode 100644 index 000000000000..3924f1ad7967 --- /dev/null +++ b/changelog.d/6713.bugfix @@ -0,0 +1 @@ +Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies. From 48e57a6452be3fef4372832f9e8f8f630325a648 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 19:40:46 +0000 Subject: [PATCH 05/14] Rename changelog --- changelog.d/{6713.bugfix => 6714.bugfix} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{6713.bugfix => 6714.bugfix} (100%) diff --git a/changelog.d/6713.bugfix b/changelog.d/6714.bugfix similarity index 100% rename from changelog.d/6713.bugfix rename to changelog.d/6714.bugfix From e601f35d3b562495b2f8b071bd4c812fd783d6a7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 09:55:11 +0000 Subject: [PATCH 06/14] Lint --- synapse/storage/data_stores/main/stream.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 451f38296bef..652cecd59b6e 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -539,7 +539,9 @@ def get_room_event_after_stream_ordering(self, room_id, stream_ordering): return self.db.runInteraction( "get_room_event_after_stream_ordering", self.get_room_event_around_stream_ordering_txn, - room_id, stream_ordering, "f", + room_id, + stream_ordering, + "f", ) def get_room_event_before_stream_ordering(self, room_id, stream_ordering): @@ -556,7 +558,9 @@ def get_room_event_before_stream_ordering(self, room_id, stream_ordering): return self.db.runInteraction( "get_room_event_before_stream_ordering", self.get_room_event_around_stream_ordering_txn, - room_id, stream_ordering, "b", + room_id, + stream_ordering, + "b", ) def get_room_event_around_stream_ordering_txn( @@ -575,6 +579,11 @@ def get_room_event_around_stream_ordering_txn( Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ + # Figure out which comparison operation to perform and how to order the results, + # using the provided direction. + op = "<=" if dir == "b" else ">=" + order = "DESC" if dir == "b" else "ASC" + sql = ( "SELECT stream_ordering, topological_ordering, event_id" " FROM events" @@ -582,10 +591,7 @@ def get_room_event_around_stream_ordering_txn( " AND NOT outlier" " ORDER BY stream_ordering %s" " LIMIT 1" - ) % ( - "<=" if dir == "b" else ">=", - "DESC" if dir == "b" else "ASC", - ) + ) % (op, order) txn.execute(sql, (room_id, stream_ordering)) return txn.fetchone() From 3098dfdf08ff14488808236e43d74823ac17399b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 14:09:50 +0000 Subject: [PATCH 07/14] Add more logging around message retention policies support So we can debug issues like #6683 more easily --- synapse/config/server.py | 8 ++++++++ synapse/handlers/pagination.py | 11 +++++++++++ 2 files changed, 19 insertions(+) diff --git a/synapse/config/server.py b/synapse/config/server.py index 9ac112233b0c..0ec1b0fadd78 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -294,6 +294,14 @@ def read_config(self, config, **kwargs): self.retention_default_min_lifetime = None self.retention_default_max_lifetime = None + if self.retention_enabled: + logger.info( + "Message retention policies support enabled with the following default" + " policy: min_lifetime = %s ; max_lifetime = %s", + self.retention_default_min_lifetime, + self.retention_default_max_lifetime, + ) + self.retention_allowed_lifetime_min = retention_config.get( "allowed_lifetime_min" ) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 3ee6a091c528..62fb69297d12 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -88,6 +88,8 @@ def __init__(self, hs): if hs.config.retention_enabled: # Run the purge jobs described in the configuration file. for job in hs.config.retention_purge_jobs: + logger.info("Setting up purge job with config: %s", job) + self.clock.looping_call( run_as_background_process, job["interval"], @@ -130,10 +132,19 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms): else: include_null = False + logger.info( + "[purge] Running purge job for range ] %d ; %d ] (include NULLs = %s", + min_ms, + max_ms, + include_null, + ) + rooms = yield self.store.get_rooms_for_retention_period_in_range( min_ms, max_ms, include_null ) + logger.debug("[purge] Rooms to purge: %s", rooms) + for room_id, retention_policy in iteritems(rooms): if room_id in self._purges_in_progress_by_room: logger.warning( From f03ab4534524a3fd3755f8f09a26711aa985fba3 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 14:19:08 +0000 Subject: [PATCH 08/14] Add another log --- synapse/handlers/pagination.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 62fb69297d12..7f18fe5cd225 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -146,6 +146,8 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms): logger.debug("[purge] Rooms to purge: %s", rooms) for room_id, retention_policy in iteritems(rooms): + logger.info("[purge] Attempting to purge messages in room %s", room_id) + if room_id in self._purges_in_progress_by_room: logger.warning( "[purge] not purging room %s as there's an ongoing purge running" From 4087427f045ff137a0b58062ca6c1af55dc07d6a Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 20:16:57 +0000 Subject: [PATCH 09/14] Typo --- synapse/handlers/pagination.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 7f18fe5cd225..9dc02eab6972 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -133,7 +133,7 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms): include_null = False logger.info( - "[purge] Running purge job for range ] %d ; %d ] (include NULLs = %s", + "[purge] Running purge job for range ] %d ; %d ] (include NULLs = %s)", min_ms, max_ms, include_null, From 842c2cfbf1e9f3e0d9251fa0c572eba9d6af6dbe Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 20:24:17 +0000 Subject: [PATCH 10/14] Remove get_room_event_after_stream_ordering entirely --- synapse/rest/admin/__init__.py | 2 +- synapse/storage/data_stores/main/stream.py | 69 ++++------------------ 2 files changed, 13 insertions(+), 58 deletions(-) diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index a10b4a9b7263..2932fe2123d8 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -107,7 +107,7 @@ async def on_POST(self, request, room_id, event_id): stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts) - r = await self.store.get_room_event_after_stream_ordering( + r = await self.store.get_room_event_before_stream_ordering( room_id, stream_ordering ) if not r: diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 652cecd59b6e..a20c3d1012f8 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -525,25 +525,6 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token): return rows, token - def get_room_event_after_stream_ordering(self, room_id, stream_ordering): - """Gets details of the first event in a room at or after a stream ordering - - Args: - room_id (str): - stream_ordering (int): - - Returns: - Deferred[(int, int, str)]: - (stream ordering, topological ordering, event_id) - """ - return self.db.runInteraction( - "get_room_event_after_stream_ordering", - self.get_room_event_around_stream_ordering_txn, - room_id, - stream_ordering, - "f", - ) - def get_room_event_before_stream_ordering(self, room_id, stream_ordering): """Gets details of the first event in a room at or before a stream ordering @@ -555,45 +536,19 @@ def get_room_event_before_stream_ordering(self, room_id, stream_ordering): Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ - return self.db.runInteraction( - "get_room_event_before_stream_ordering", - self.get_room_event_around_stream_ordering_txn, - room_id, - stream_ordering, - "b", - ) - - def get_room_event_around_stream_ordering_txn( - self, txn, room_id, stream_ordering, dir="f" - ): - """Gets details of the first event in a room at or either after or before a - stream ordering, depending on the provided direction. - - Args: - room_id (str): - stream_ordering (int): - dir (str): Direction in which we're looking towards in the room's history, - either "f" (forward) or "b" (backward). - - Returns: - Deferred[(int, int, str)]: - (stream ordering, topological ordering, event_id) - """ - # Figure out which comparison operation to perform and how to order the results, - # using the provided direction. - op = "<=" if dir == "b" else ">=" - order = "DESC" if dir == "b" else "ASC" + def _f(txn): + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering <= ?" + " AND NOT outlier" + " ORDER BY stream_ordering DESC" + " LIMIT 1" + ) + txn.execute(sql, (room_id, stream_ordering)) + return txn.fetchone() - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering %s ?" - " AND NOT outlier" - " ORDER BY stream_ordering %s" - " LIMIT 1" - ) % (op, order) - txn.execute(sql, (room_id, stream_ordering)) - return txn.fetchone() + return self.db.runInteraction("get_room_event_before_stream_ordering", _f) @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None): From dac148341ba2638cc9486cf0b00005932dab939d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 20:25:09 +0000 Subject: [PATCH 11/14] Fixup diff --- synapse/storage/data_stores/main/stream.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index a20c3d1012f8..056b25b13a3f 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -536,14 +536,15 @@ def get_room_event_before_stream_ordering(self, room_id, stream_ordering): Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ + def _f(txn): sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering <= ?" - " AND NOT outlier" - " ORDER BY stream_ordering DESC" - " LIMIT 1" + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering <= ?" + " AND NOT outlier" + " ORDER BY stream_ordering DESC" + " LIMIT 1" ) txn.execute(sql, (room_id, stream_ordering)) return txn.fetchone() From 4fb3cb208a17ba36a5da050b19e3997cf4808f9a Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 20:27:07 +0000 Subject: [PATCH 12/14] Precise changelog --- changelog.d/6714.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/6714.bugfix b/changelog.d/6714.bugfix index 3924f1ad7967..410516694fc6 100644 --- a/changelog.d/6714.bugfix +++ b/changelog.d/6714.bugfix @@ -1 +1 @@ -Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies. +Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs. From 951211f157762fdedcaf26deb6055ed3c068b144 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 17 Jan 2020 11:37:33 +0000 Subject: [PATCH 13/14] Use a less controversial notation for intervals --- synapse/handlers/pagination.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 9dc02eab6972..71d76202c93d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -133,7 +133,7 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms): include_null = False logger.info( - "[purge] Running purge job for range ] %d ; %d ] (include NULLs = %s)", + "[purge] Running purge job for %d < max_lifetime <= %d (include NULLs = %s)", min_ms, max_ms, include_null, From 9a70beee7ec654d4c7039db59877798c28938ea6 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 17 Jan 2020 14:31:45 +0000 Subject: [PATCH 14/14] Changelog --- changelog.d/6717.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6717.misc diff --git a/changelog.d/6717.misc b/changelog.d/6717.misc new file mode 100644 index 000000000000..a2a7776126b0 --- /dev/null +++ b/changelog.d/6717.misc @@ -0,0 +1 @@ +Add more logging around message retention policies support.