diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 2000a77c260c5e..5b99f8e44b2339 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -2815,8 +2815,11 @@ def build_cdc_postgres_init_db_volume(settings): # for synchronization/progress report. SENTRY_REPROCESSING_SYNC_REDIS_CLUSTER = "default" -# How long can reprocessing take before we start deleting its Redis keys? -SENTRY_REPROCESSING_SYNC_TTL = 3600 * 24 +# How long tombstones from reprocessing will live. +SENTRY_REPROCESSING_TOMBSTONES_TTL = 24 * 3600 + +# How long reprocessing counters are kept in Redis before they expire. +SENTRY_REPROCESSING_SYNC_TTL = 30 * 24 * 3600 # 30 days # How many events to query for at once while paginating through an entire # issue. Note that this needs to be kept in sync with the time-limits on diff --git a/src/sentry/reprocessing2.py b/src/sentry/reprocessing2.py index 25a4e5d4885656..7175f42b7819f5 100644 --- a/src/sentry/reprocessing2.py +++ b/src/sentry/reprocessing2.py @@ -20,9 +20,11 @@ preprocess_event. The event payload is taken from a backup that was made on first ingestion in preprocess_event. -3. wait_group_reprocessed in sentry.tasks.reprocessing2 polls a counter in - Redis to see if reprocessing is done. When it reaches zero, all associated - models like assignee and activity are moved into the new group. +3. `mark_event_reprocessed` will decrement the pending event counter in Redis + to see if reprocessing is done. + + When the counter reaches zero, it will trigger the `finish_reprocessing` task, + which will move all associated models like assignee and activity into the new group. A group redirect is installed. The old group is deleted, while the new group is unresolved. This effectively unsets the REPROCESSING status. @@ -397,12 +399,12 @@ def buffered_delete_old_primary_hash( if old_primary_hash is not None and old_primary_hash != current_primary_hash: event_key = _get_old_primary_hash_subset_key(project_id, group_id, old_primary_hash) client.lpush(event_key, f"{to_timestamp(datetime)};{event_id}") - client.expire(event_key, settings.SENTRY_REPROCESSING_SYNC_TTL) + client.expire(event_key, settings.SENTRY_REPROCESSING_TOMBSTONES_TTL) if old_primary_hash not in old_primary_hashes: old_primary_hashes.add(old_primary_hash) client.sadd(primary_hash_set_key, old_primary_hash) - client.expire(primary_hash_set_key, settings.SENTRY_REPROCESSING_SYNC_TTL) + client.expire(primary_hash_set_key, settings.SENTRY_REPROCESSING_TOMBSTONES_TTL) with sentry_sdk.configure_scope() as scope: scope.set_tag("project_id", project_id) @@ -497,10 +499,6 @@ def buffered_handle_remaining_events( client = _get_sync_redis_client() # We explicitly cluster by only project_id and group_id here such that our # RENAME command later succeeds. - # - # We also use legacy string formatting here because new-style Python - # formatting is quite confusing when the output string is supposed to - # contain {}. key = f"re2:remaining:{{{project_id}:{old_group_id}}}" if datetime_to_event: @@ -579,8 +577,12 @@ def mark_event_reprocessed(data=None, group_id=None, project_id=None, num_events project_id = data["project"] + client = _get_sync_redis_client() + # refresh the TTL of the metadata: + client.expire(_get_info_reprocessed_key(group_id), settings.SENTRY_REPROCESSING_SYNC_TTL) key = _get_sync_counter_key(group_id) - if _get_sync_redis_client().decrby(key, num_events) == 0: + client.expire(key, settings.SENTRY_REPROCESSING_SYNC_TTL) + if client.decrby(key, num_events) == 0: from sentry.tasks.reprocessing2 import finish_reprocessing finish_reprocessing.delay(project_id=project_id, group_id=group_id)