From 7af5dcff616e176b8f4bec02bbc5368fa9431ca5 Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Sat, 4 Oct 2025 17:57:31 +0530 Subject: [PATCH] Add webhook metadata cleanup script and enhance Redis cleanup logic - Introduced a new script for cleaning up old webhook metadata in Redis, allowing for batch processing and dry run options. - Updated existing Redis cleanup script to handle additional patterns and results for user operations, EIP-7702, webhooks, and external bundler sends. - Improved error handling and statistics tracking for various cleanup operations. - Added tests to verify pruning behavior with randomly generated job IDs to ensure correctness in job metadata management. --- scripts/cleanup-webhook-meta.ts | 206 +++++++++++++++++ scripts/simple-redis-cleanup.ts | 87 +++++++- twmq/src/lib.rs | 30 ++- twmq/tests/prune_race_random_ids.rs | 329 ++++++++++++++++++++++++++++ 4 files changed, 644 insertions(+), 8 deletions(-) create mode 100644 scripts/cleanup-webhook-meta.ts create mode 100644 twmq/tests/prune_race_random_ids.rs diff --git a/scripts/cleanup-webhook-meta.ts b/scripts/cleanup-webhook-meta.ts new file mode 100644 index 0000000..9e427d9 --- /dev/null +++ b/scripts/cleanup-webhook-meta.ts @@ -0,0 +1,206 @@ +#!/usr/bin/env bun + +import Redis from "ioredis"; + +if (!process.env.REDIS_URL) { + throw new Error("REDIS_URL is not set"); +} + +// Configuration +const CONFIG = { + redisUrl: process.env.REDIS_URL, + batchSize: 5000, + dryRun: false, // Set to false to actually delete + maxAgeHours: 3, // Delete jobs finished more than 3 hours ago +} as const; + +class WebhookMetaCleanup { + private redis: Redis; + private stats = { + totalScanned: 0, + totalDeleted: 0, + totalSkipped: 0, + errors: 0, + invalidTimestamps: 0, + }; + + constructor() { + this.redis = new Redis(CONFIG.redisUrl); + } + + async run(): Promise { + console.log(`🚀 Starting cleanup (DRY_RUN: ${CONFIG.dryRun})`); + console.log("🎯 Target pattern:"); + console.log(" - twmq:engine-cloud_webhook:job:*:meta"); + console.log(` - Max age: ${CONFIG.maxAgeHours} hours`); + console.log(""); + + try { + await this.cleanOldJobMeta(); + this.printFinalStats(); + } catch (error) { + console.error(`💥 Fatal error: ${error}`); + throw error; + } finally { + await this.redis.quit(); + } + } + + private async cleanOldJobMeta(): Promise { + const pattern = "twmq:engine-cloud_webhook:job:*:meta"; + console.log(`🔍 Scanning pattern: ${pattern}`); + + let cursor = "0"; + // Unix timestamps are always in UTC (seconds since Jan 1, 1970 00:00:00 UTC) + const now = Math.floor(Date.now() / 1000); + const cutoffTimestamp = now - (CONFIG.maxAgeHours * 60 * 60); + + console.log(` Current time (UTC): ${now} (${new Date(now * 1000).toISOString()})`); + console.log(` Cutoff time (UTC): ${cutoffTimestamp} (${new Date(cutoffTimestamp * 1000).toISOString()})`); + console.log(""); + + do { + const [newCursor, keys] = await this.redis.scan( + cursor, + "MATCH", + pattern, + "COUNT", + CONFIG.batchSize + ); + cursor = newCursor; + + if (keys.length > 0) { + this.stats.totalScanned += keys.length; + console.log(` Scanned ${keys.length} keys (total: ${this.stats.totalScanned})`); + + await this.processKeyBatch(keys, cutoffTimestamp); + } + } while (cursor !== "0"); + + console.log(`✅ Scan complete: ${pattern} (scanned ${this.stats.totalScanned} keys)`); + console.log(""); + } + + private async processKeyBatch(keys: string[], cutoffTimestamp: number): Promise { + const keysToDelete: string[] = []; + + // Batch fetch all finished_at timestamps using pipeline + const pipeline = this.redis.pipeline(); + for (const key of keys) { + pipeline.hget(key, "finished_at"); + } + + let results; + try { + results = await pipeline.exec(); + } catch (error) { + console.error(` 💥 Error fetching timestamps batch: ${error}`); + this.stats.errors += keys.length; + return; + } + + // Process results + for (let i = 0; i < keys.length; i++) { + const key = keys[i]; + if (!key) continue; + + const [err, finishedAt] = results?.[i] ?? [null, null]; + + if (err) { + console.error(` 💥 Error processing key ${key}: ${err}`); + this.stats.errors += 1; + continue; + } + + if (!finishedAt) { + this.stats.totalSkipped += 1; + continue; + } + + const finishedAtTimestamp = parseInt(finishedAt as string, 10); + + if (isNaN(finishedAtTimestamp)) { + this.stats.invalidTimestamps += 1; + continue; + } + + if (finishedAtTimestamp < cutoffTimestamp) { + const age = Math.floor((Date.now() / 1000 - finishedAtTimestamp) / 3600); + if (keysToDelete.length < 10) { + // Only log first 10 to avoid spam + console.log(` 🗑️ Marking for deletion: ${key} (finished ${age}h ago)`); + } + keysToDelete.push(key); + } else { + this.stats.totalSkipped += 1; + } + } + + // Delete the marked keys + if (keysToDelete.length > 0) { + console.log(` Found ${keysToDelete.length} keys to delete in this batch`); + if (CONFIG.dryRun) { + console.log(` [DRY RUN] Would delete ${keysToDelete.length} keys`); + this.stats.totalDeleted += keysToDelete.length; + } else { + await this.deleteKeys(keysToDelete); + } + } + } + + private async deleteKeys(keys: string[]): Promise { + try { + const pipeline = this.redis.pipeline(); + for (const key of keys) { + pipeline.del(key); + } + + const results = await pipeline.exec(); + const deletedCount = results?.filter(([err]) => err === null).length || 0; + const failedCount = keys.length - deletedCount; + + console.log(` ✅ Deleted ${deletedCount} keys`); + if (failedCount > 0) { + console.log(` ❌ Failed to delete ${failedCount} keys`); + this.stats.errors += failedCount; + } + + this.stats.totalDeleted += deletedCount; + } catch (error) { + console.error(` 💥 Error deleting batch: ${error}`); + this.stats.errors += keys.length; + } + } + + private printFinalStats(): void { + console.log("📈 Final Statistics:"); + console.log(` Total Scanned: ${this.stats.totalScanned.toLocaleString()}`); + console.log(` Total ${CONFIG.dryRun ? 'Would Delete' : 'Deleted'}: ${this.stats.totalDeleted.toLocaleString()}`); + console.log(` Total Skipped (not old enough): ${this.stats.totalSkipped.toLocaleString()}`); + if (this.stats.invalidTimestamps > 0) { + console.log(` Invalid Timestamps: ${this.stats.invalidTimestamps.toLocaleString()}`); + } + if (this.stats.errors > 0) { + console.log(` Errors: ${this.stats.errors.toLocaleString()}`); + } + console.log(""); + + if (CONFIG.dryRun) { + console.log("💡 This was a DRY RUN - no data was actually deleted"); + console.log("💡 Set CONFIG.dryRun = false to actually delete the keys"); + } else { + console.log("✅ CLEANUP COMPLETED - Data has been permanently deleted"); + } + } +} + +// Main execution +async function main() { + const cleaner = new WebhookMetaCleanup(); + await cleaner.run(); +} + +if (import.meta.main) { + main().catch(console.error); +} + diff --git a/scripts/simple-redis-cleanup.ts b/scripts/simple-redis-cleanup.ts index 0d69684..602323f 100644 --- a/scripts/simple-redis-cleanup.ts +++ b/scripts/simple-redis-cleanup.ts @@ -17,7 +17,13 @@ class SimpleRedisCleanup { private redis: Redis; private stats = { useropErrors: 0, + useropResults: 0, eip7702Errors: 0, + eip7702Results: 0, + webhookErrors: 0, + webhookResults: 0, + externalBundlerErrors: 0, + externalBundlerResults: 0, totalDeleted: 0, errors: 0, }; @@ -30,15 +36,31 @@ class SimpleRedisCleanup { console.log(`🚀 Starting cleanup (DRY_RUN: ${CONFIG.dryRun})`); console.log("🎯 Target patterns:"); console.log(" - twmq:engine-cloud_userop_confirm:job:*:errors"); + console.log(" - twmq:engine-cloud_userop_confirm:jobs:result (hash)"); console.log(" - twmq:engine-cloud_eip7702_send:job:*:errors"); + console.log(" - twmq:engine-cloud_eip7702_send:jobs:result (hash)"); + console.log(" - twmq:engine-cloud_webhook:job:*:errors"); + console.log(" - twmq:engine-cloud_webhook:jobs:result (hash)"); + console.log(" - twmq:engine-cloud_external_bundler_send:job:*:errors"); + console.log(" - twmq:engine-cloud_external_bundler_send:jobs:result (hash)"); console.log(""); try { - // Clean userop confirm error keys + // Clean userop confirm keys await this.cleanPattern("twmq:engine-cloud_userop_confirm:job:*:errors"); + await this.cleanHash("twmq:engine-cloud_userop_confirm:jobs:result", "userop_confirm"); - // Clean eip7702 send error keys + // Clean eip7702 send keys await this.cleanPattern("twmq:engine-cloud_eip7702_send:job:*:errors"); + await this.cleanHash("twmq:engine-cloud_eip7702_send:jobs:result", "eip7702_send"); + + // Clean webhook keys + await this.cleanPattern("twmq:engine-cloud_webhook:job:*:errors"); + await this.cleanHash("twmq:engine-cloud_webhook:jobs:result", "webhook"); + + // Clean external bundler send keys + await this.cleanPattern("twmq:engine-cloud_external_bundler_send:job:*:errors"); + await this.cleanHash("twmq:engine-cloud_external_bundler_send:jobs:result", "external_bundler_send"); this.printFinalStats(); } catch (error) { @@ -83,6 +105,37 @@ class SimpleRedisCleanup { console.log(""); } + private async cleanHash(key: string, queueType: string): Promise { + console.log(`🔍 Checking hash: ${key}`); + + try { + const exists = await this.redis.exists(key); + + if (exists) { + const fieldCount = await this.redis.hlen(key); + console.log(` Found hash with ${fieldCount} fields`); + + if (CONFIG.dryRun) { + console.log(` [DRY RUN] Would delete hash with ${fieldCount} fields`); + this.updateStatsForHash(queueType, fieldCount); + } else { + await this.redis.del(key); + console.log(` ✅ Deleted hash with ${fieldCount} fields`); + this.updateStatsForHash(queueType, fieldCount); + this.stats.totalDeleted += 1; + } + } else { + console.log(` Hash does not exist`); + } + + console.log(`✅ Hash complete: ${key}`); + console.log(""); + } catch (error) { + console.error(` 💥 Error handling hash: ${error}`); + this.stats.errors += 1; + } + } + private async deleteKeys(keys: string[]): Promise { try { const pipeline = this.redis.pipeline(); @@ -112,13 +165,39 @@ class SimpleRedisCleanup { this.stats.useropErrors += count; } else if (pattern.includes("eip7702_send")) { this.stats.eip7702Errors += count; + } else if (pattern.includes("webhook")) { + this.stats.webhookErrors += count; + } else if (pattern.includes("external_bundler_send")) { + this.stats.externalBundlerErrors += count; + } + } + + private updateStatsForHash(queueType: string, count: number): void { + if (queueType === "userop_confirm") { + this.stats.useropResults += count; + } else if (queueType === "eip7702_send") { + this.stats.eip7702Results += count; + } else if (queueType === "webhook") { + this.stats.webhookResults += count; + } else if (queueType === "external_bundler_send") { + this.stats.externalBundlerResults += count; } } private printFinalStats(): void { console.log("📈 Final Statistics:"); - console.log(` Userop Confirm Errors: ${this.stats.useropErrors.toLocaleString()}`); - console.log(` EIP-7702 Send Errors: ${this.stats.eip7702Errors.toLocaleString()}`); + console.log(` Userop Confirm:`); + console.log(` - Errors: ${this.stats.useropErrors.toLocaleString()}`); + console.log(` - Result Hash Fields: ${this.stats.useropResults.toLocaleString()}`); + console.log(` EIP-7702 Send:`); + console.log(` - Errors: ${this.stats.eip7702Errors.toLocaleString()}`); + console.log(` - Result Hash Fields: ${this.stats.eip7702Results.toLocaleString()}`); + console.log(` Webhook:`); + console.log(` - Errors: ${this.stats.webhookErrors.toLocaleString()}`); + console.log(` - Result Hash Fields: ${this.stats.webhookResults.toLocaleString()}`); + console.log(` External Bundler Send:`); + console.log(` - Errors: ${this.stats.externalBundlerErrors.toLocaleString()}`); + console.log(` - Result Hash Fields: ${this.stats.externalBundlerResults.toLocaleString()}`); console.log(` Total ${CONFIG.dryRun ? 'Would Delete' : 'Deleted'}: ${this.stats.totalDeleted.toLocaleString()}`); if (this.stats.errors > 0) { console.log(` Errors: ${this.stats.errors}`); diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index a9786d5..853b7fd 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -960,9 +960,14 @@ impl Queue { for _, j_id in ipairs(job_ids_to_delete) do -- CRITICAL FIX: Check if this job_id is currently active/pending/delayed -- This prevents the race where we prune metadata for a job that's currently running + -- or about to run (pending). LPOS is O(N) but necessary for correctness when + -- job IDs are reused (e.g., eoa_address_chainId pattern). local is_active = redis.call('HEXISTS', active_hash, j_id) == 1 - local is_pending = redis.call('LPOS', pending_list, j_id) ~= nil - local is_delayed = redis.call('ZSCORE', delayed_zset, j_id) ~= nil + -- CRITICAL: Redis nil bulk reply converts to Lua `false`, not `nil`! + local lpos_result = redis.call('LPOS', pending_list, j_id) + local is_pending = type(lpos_result) == "number" + local zscore_result = redis.call('ZSCORE', delayed_zset, j_id) + local is_delayed = type(zscore_result) == "number" -- Only delete if the job is NOT currently in the system if not is_active and not is_pending and not is_delayed then @@ -979,6 +984,7 @@ impl Queue { end redis.call('LTRIM', list_name, 0, max_len - 1) end + return actually_deleted "#, ); @@ -998,6 +1004,11 @@ impl Queue { if trimmed_count > 0 { tracing::info!("Pruned {} successful jobs", trimmed_count); + } else { + tracing::debug!( + queue = self.name(), + "Pruning ran but deleted 0 jobs (all were protected or none eligible)" + ); } Ok(()) @@ -1128,9 +1139,15 @@ impl Queue { if #job_ids_to_delete > 0 then for _, j_id in ipairs(job_ids_to_delete) do -- CRITICAL FIX: Check if this job_id is currently active/pending/delayed + -- This prevents the race where we prune metadata for a job that's currently running + -- or about to run (pending). LPOS is O(N) but necessary for correctness when + -- job IDs are reused (e.g., eoa_address_chainId pattern). local is_active = redis.call('HEXISTS', active_hash, j_id) == 1 - local is_pending = redis.call('LPOS', pending_list, j_id) ~= nil - local is_delayed = redis.call('ZSCORE', delayed_zset, j_id) ~= nil + -- CRITICAL: Redis nil bulk reply converts to Lua `false`, not `nil`! + local lpos_result = redis.call('LPOS', pending_list, j_id) + local is_pending = type(lpos_result) == "number" + local zscore_result = redis.call('ZSCORE', delayed_zset, j_id) + local is_delayed = type(zscore_result) == "number" -- Only delete if the job is NOT currently in the system if not is_active and not is_pending and not is_delayed then @@ -1164,6 +1181,11 @@ impl Queue { if trimmed_count > 0 { tracing::info!("Pruned {} failed jobs", trimmed_count); + } else { + tracing::debug!( + queue = self.name(), + "Pruning ran but deleted 0 jobs (all were protected or none eligible)" + ); } Ok(()) diff --git a/twmq/tests/prune_race_random_ids.rs b/twmq/tests/prune_race_random_ids.rs new file mode 100644 index 0000000..21f436e --- /dev/null +++ b/twmq/tests/prune_race_random_ids.rs @@ -0,0 +1,329 @@ +// Test to verify pruning works correctly with randomly generated job IDs +// This tests the scenario where job IDs are unique (not reused), which should +// allow more aggressive pruning without race conditions. + +mod fixtures; +use fixtures::TestJobErrorData; +use redis::{aio::ConnectionManager, AsyncCommands}; +use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; + +use std::{ + sync::{ + Arc, + atomic::{AtomicBool, AtomicU32, Ordering}, + }, + time::Duration, +}; + +use serde::{Deserialize, Serialize}; +use twmq::{ + DurableExecution, Queue, NackHookData, SuccessHookData, IdempotencyMode, + hooks::TransactionContext, + job::{BorrowedJob, JobError, JobResult, RequeuePosition}, + queue::QueueOptions, +}; + +const REDIS_URL: &str = "redis://127.0.0.1:6379/"; + +// Shared state to control test flow +static SHOULD_NACK: AtomicBool = AtomicBool::new(true); +static SUCCESS_COUNT: AtomicU32 = AtomicU32::new(0); +static NACK_COUNT: AtomicU32 = AtomicU32::new(0); + +// Job that simulates EOA executor behavior with random IDs +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct RandomJobPayload { + pub eoa: String, + pub chain_id: u64, + pub unique_id: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct RandomJobOutput { + pub message: String, + pub success_number: u32, +} + +pub struct RandomJobHandler; + +impl DurableExecution for RandomJobHandler { + type Output = RandomJobOutput; + type ErrorData = TestJobErrorData; + type JobData = RandomJobPayload; + + async fn process( + &self, + job: &BorrowedJob, + ) -> JobResult { + tracing::debug!( + job_id = ?job.job.id, + "Job processing started" + ); + + // Simulate work + tokio::time::sleep(Duration::from_millis(50)).await; + + let should_nack = SHOULD_NACK.load(Ordering::SeqCst); + + if should_nack { + let nack_num = NACK_COUNT.fetch_add(1, Ordering::SeqCst) + 1; + tracing::debug!( + job_id = ?job.job.id, + nack_count = nack_num, + "Job nacking" + ); + + Err(JobError::Nack { + error: TestJobErrorData { + reason: format!("Work remaining (nack #{})", nack_num), + }, + delay: Some(Duration::from_millis(50)), + position: RequeuePosition::Last, + }) + } else { + let success_num = SUCCESS_COUNT.fetch_add(1, Ordering::SeqCst) + 1; + tracing::debug!( + job_id = ?job.job.id, + success_count = success_num, + "Job succeeding" + ); + + Ok(RandomJobOutput { + message: format!("Success #{}", success_num), + success_number: success_num, + }) + } + } + + async fn on_success( + &self, + job: &BorrowedJob, + _d: SuccessHookData<'_, Self::Output>, + _tx: &mut TransactionContext<'_>, + ) { + tracing::debug!( + job_id = ?job.job.id, + "on_success hook called" + ); + } + + async fn on_nack( + &self, + job: &BorrowedJob, + _d: NackHookData<'_, Self::ErrorData>, + _tx: &mut TransactionContext<'_>, + ) { + tracing::debug!( + job_id = ?job.job.id, + "on_nack hook called" + ); + } +} + +// Helper to clean up Redis keys +async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { + let mut conn = conn_manager.clone(); + let keys_pattern = format!("twmq:{queue_name}:*"); + + let keys: Vec = redis::cmd("KEYS") + .arg(&keys_pattern) + .query_async(&mut conn) + .await + .unwrap_or_default(); + if !keys.is_empty() { + redis::cmd("DEL") + .arg(keys) + .query_async::<()>(&mut conn) + .await + .unwrap_or_default(); + } + tracing::info!("Cleaned up keys for pattern: {keys_pattern}"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_prune_with_random_ids() { + // Initialize tracing + let _ = tracing_subscriber::registry() + .with( + EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "twmq=debug,prune_race_random_ids=info".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .try_init(); + + let queue_name = format!("test_random_ids_{}", nanoid::nanoid!(6)); + + // Aggressive pruning settings - only keep 1 successful job + let queue_options = QueueOptions { + local_concurrency: 2, + max_success: 1, // Aggressive pruning + max_failed: 10, + lease_duration: Duration::from_secs(3), + idempotency_mode: IdempotencyMode::Active, + ..Default::default() + }; + + tracing::info!("=== RANDOM ID PRUNING TEST ==="); + tracing::info!("Queue: {}", queue_name); + tracing::info!("Max success jobs: {}", queue_options.max_success); + tracing::info!("Testing pruning behavior with unique random job IDs"); + + // Reset test state + SHOULD_NACK.store(false, Ordering::SeqCst); // Start with successes + SUCCESS_COUNT.store(0, Ordering::SeqCst); + NACK_COUNT.store(0, Ordering::SeqCst); + + let handler = RandomJobHandler; + + // Create queue + let queue = Arc::new( + Queue::new( + REDIS_URL, + &queue_name, + Some(queue_options), + handler, + ) + .await + .expect("Failed to create queue"), + ); + + cleanup_redis_keys(&queue.redis, &queue_name).await; + + // Start two workers + let worker1 = queue.work(); + let worker2 = queue.work(); + + tracing::info!("Two workers started!"); + + // Push jobs with random IDs and let them succeed + for i in 0..100 { + let random_job_id = nanoid::nanoid!(16); // Random unique ID + + let job_payload = RandomJobPayload { + eoa: format!("0x{}", nanoid::nanoid!(8)), + chain_id: 137, + unique_id: random_job_id.clone(), + }; + + queue + .clone() + .job(job_payload) + .with_id(&random_job_id) + .push() + .await + .expect("Failed to push job"); + + if i % 10 == 0 { + tracing::info!("Pushed {} jobs", i + 1); + } + + // Small delay between pushes + tokio::time::sleep(Duration::from_millis(10)).await; + } + + // Wait for jobs to complete + tracing::info!("Waiting for jobs to complete..."); + for _ in 0..100 { + let success_count = SUCCESS_COUNT.load(Ordering::SeqCst); + if success_count >= 100 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + + let final_success = SUCCESS_COUNT.load(Ordering::SeqCst); + + // Check Redis directly to see if pruning occurred + let mut conn = queue.redis.clone(); + let success_list_len: usize = conn.llen(queue.success_list_name()).await.unwrap(); + let pending_list_len: usize = conn.llen(queue.pending_list_name()).await.unwrap(); + let delayed_zset_len: usize = conn.zcard(queue.delayed_zset_name()).await.unwrap(); + let active_hash_len: usize = conn.hlen(queue.active_hash_name()).await.unwrap(); + + // Get actual job IDs in success list + let success_job_ids: Vec = conn.lrange(queue.success_list_name(), 0, -1).await.unwrap(); + + // Count how many job metadata hashes still exist (should match success list length if pruning works) + let meta_pattern = format!("twmq:{}:job:*:meta", queue.name()); + let meta_keys: Vec = redis::cmd("KEYS") + .arg(&meta_pattern) + .query_async(&mut conn) + .await + .unwrap_or_default(); + let metadata_count = meta_keys.len(); + + // Count job data entries + let job_data_count: usize = conn.hlen(queue.job_data_hash_name()).await.unwrap(); + + // Get what's in pending/delayed/active to understand why pruning might be blocked + let pending_jobs: Vec = conn.lrange(queue.pending_list_name(), 0, -1).await.unwrap_or_default(); + let delayed_jobs: Vec = redis::cmd("ZRANGE") + .arg(queue.delayed_zset_name()) + .arg(0) + .arg(-1) + .query_async(&mut conn) + .await + .unwrap_or_default(); + let active_jobs: Vec = conn.hkeys(queue.active_hash_name()).await.unwrap_or_default(); + + // Get debug info from Lua script + let debug_candidates: String = conn.get("debug:candidates_count").await.unwrap_or_else(|_| "N/A".to_string()); + let debug_active_count: String = conn.get("debug:active_count").await.unwrap_or_else(|_| "N/A".to_string()); + let debug_delayed_count: String = conn.get("debug:delayed_count").await.unwrap_or_else(|_| "N/A".to_string()); + let debug_blocked_job: String = conn.get("debug:last_blocked_job_id").await.unwrap_or_else(|_| "N/A".to_string()); + let debug_hexists: String = conn.get("debug:last_hexists_result").await.unwrap_or_else(|_| "N/A".to_string()); + let debug_zscore: String = conn.get("debug:last_zscore_result").await.unwrap_or_else(|_| "N/A".to_string()); + let debug_is_active: String = conn.get("debug:last_is_active").await.unwrap_or_else(|_| "N/A".to_string()); + let debug_is_delayed: String = conn.get("debug:last_is_delayed").await.unwrap_or_else(|_| "N/A".to_string()); + + tracing::info!("=== RESULTS ==="); + tracing::info!("Total successes: {}", final_success); + tracing::info!("Total nacks: {}", NACK_COUNT.load(Ordering::SeqCst)); + tracing::info!(""); + tracing::info!("=== Redis State ==="); + tracing::info!("Success list length: {}", success_list_len); + tracing::info!("Pending list length: {}", pending_list_len); + tracing::info!("Delayed zset length: {}", delayed_zset_len); + tracing::info!("Active hash length: {}", active_hash_len); + tracing::info!("Job metadata count: {}", metadata_count); + tracing::info!("Job data hash entries: {}", job_data_count); + tracing::info!(""); + tracing::info!("=== Job IDs (for leak investigation) ==="); + tracing::info!("Success list job IDs: {:?}", success_job_ids); + tracing::info!("Pending list job IDs: {:?}", pending_jobs); + tracing::info!("Delayed zset job IDs: {:?}", delayed_jobs); + tracing::info!("Active hash job IDs: {:?}", active_jobs); + tracing::info!(""); + tracing::info!("=== Lua Script Debug Info ==="); + tracing::info!("Candidates to delete (last run): {}", debug_candidates); + tracing::info!("Active count (at check time): {}", debug_active_count); + tracing::info!("Delayed count (at check time): {}", debug_delayed_count); + tracing::info!("First blocked job ID: {}", debug_blocked_job); + tracing::info!(" HEXISTS result: {}", debug_hexists); + tracing::info!(" ZSCORE result: {}", debug_zscore); + tracing::info!(" is_active (HEXISTS==1): {}", debug_is_active); + tracing::info!(" is_delayed (ZSCORE~=nil): {}", debug_is_delayed); + tracing::info!(""); + tracing::info!("Max success setting: {}", queue.options.max_success); + + if success_list_len <= queue.options.max_success { + tracing::info!("✅ List pruning is working - success list is within max_success limit"); + } else { + tracing::warn!("⚠️ Success list ({}) exceeds max_success ({})", + success_list_len, queue.options.max_success); + tracing::warn!(" This might indicate list pruning is not working correctly"); + } + + if metadata_count == success_list_len { + tracing::info!("✅ Metadata cleanup is working - metadata count matches list length"); + } else { + tracing::warn!("⚠️ Metadata leak detected!"); + tracing::warn!(" Job metadata hashes: {}, Success list length: {}", metadata_count, success_list_len); + tracing::warn!(" {} job metadata entries were not cleaned up", metadata_count.saturating_sub(success_list_len)); + } + + worker1.shutdown().await.unwrap(); + worker2.shutdown().await.unwrap(); + cleanup_redis_keys(&queue.redis, &queue_name).await; +} +