Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add more logging around message retention policies support #6717

Merged
merged 15 commits into from
Jan 17, 2020
Merged
1 change: 1 addition & 0 deletions changelog.d/6714.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs.
1 change: 1 addition & 0 deletions changelog.d/6717.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add more logging around message retention policies support.
8 changes: 8 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ def read_config(self, config, **kwargs):
self.retention_default_min_lifetime = None
self.retention_default_max_lifetime = None

if self.retention_enabled:
logger.info(
"Message retention policies support enabled with the following default"
" policy: min_lifetime = %s ; max_lifetime = %s",
self.retention_default_min_lifetime,
self.retention_default_max_lifetime,
)

self.retention_allowed_lifetime_min = retention_config.get(
"allowed_lifetime_min"
)
Expand Down
15 changes: 14 additions & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def __init__(self, hs):
if hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
logger.info("Setting up purge job with config: %s", job)

self.clock.looping_call(
run_as_background_process,
job["interval"],
Expand Down Expand Up @@ -130,11 +132,22 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms):
else:
include_null = False

logger.info(
"[purge] Running purge job for %d < max_lifetime <= %d (include NULLs = %s)",
min_ms,
max_ms,
include_null,
)

rooms = yield self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)

logger.debug("[purge] Rooms to purge: %s", rooms)

for room_id, retention_policy in iteritems(rooms):
logger.info("[purge] Attempting to purge messages in room %s", room_id)

if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
Expand All @@ -156,7 +169,7 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms):

stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)

r = yield self.store.get_room_event_after_stream_ordering(
r = yield self.store.get_room_event_before_stream_ordering(
room_id, stream_ordering,
)
if not r:
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async def on_POST(self, request, room_id, event_id):

stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)

r = await self.store.get_room_event_after_stream_ordering(
r = await self.store.get_room_event_before_stream_ordering(
room_id, stream_ordering
)
if not r:
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/data_stores/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token):

return rows, token

def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
def get_room_event_before_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or before a stream ordering

Args:
room_id (str):
Expand All @@ -541,15 +541,15 @@ def _f(txn):
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering >= ?"
" WHERE room_id = ? AND stream_ordering <= ?"
" AND NOT outlier"
" ORDER BY stream_ordering"
" ORDER BY stream_ordering DESC"
" LIMIT 1"
)
txn.execute(sql, (room_id, stream_ordering))
return txn.fetchone()

return self.db.runInteraction("get_room_event_after_stream_ordering", _f)
return self.db.runInteraction("get_room_event_before_stream_ordering", _f)

@defer.inlineCallbacks
def get_room_events_max_id(self, room_id=None):
Expand Down