Skip to content

Commit

Permalink
Done implementing clean_intermediate_queue()
Browse files Browse the repository at this point in the history
  • Loading branch information
selwin committed Apr 28, 2024
1 parent aae4285 commit a7f19d9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 60 deletions.
70 changes: 46 additions & 24 deletions tests/test_intermediate_queue.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import unittest

from datetime import datetime, timedelta, timezone
from unittest.mock import patch

from redis import Redis

from rq import Queue, Worker
from rq.intermediate_queue import IntermediateQueue
from rq.maintenance import clean_intermediate_queue
from rq.utils import get_version
from tests import RQTestCase
from tests.fixtures import say_hello
Expand Down Expand Up @@ -84,26 +84,48 @@ def test_get_job_ids(self):
intermediate_queue.remove(job_1.id)
self.assertEqual(intermediate_queue.get_job_ids(), [job_2.id])

# def test_cleanup_intermediate_queue(self):
# """Ensure jobs stuck in the intermediate queue are cleaned up."""
# queue = Queue('foo', connection=self.connection)
# job = queue.enqueue(say_hello)

# intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
# self.connection.delete(intermediate_queue.key)

# # If job execution fails after it's dequeued, job should be in the intermediate queue
# # and it's status is still QUEUED
# with patch.object(Worker, 'execute_job'):
# worker = Worker(queue, connection=self.testconn)
# worker.work(burst=True)

# self.assertEqual(job.get_status(), 'queued')
# self.assertFalse(job.id in queue.get_job_ids())
# self.assertIsNotNone(self.connection.lpos(queue.intermediate_queue_key, job.id))
# # After cleaning up the intermediate queue, job status should be `failed`
# # and job is also removed from the intermediate queue
# intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
# intermediate_queue.remove(job.id)
# self.assertEqual(job.get_status(), 'failed')
# self.assertIsNone(self.connection.lpos(queue.intermediate_queue_key, job.id))
def test_cleanup_intermediate_queue(self):
"""Ensure jobs stuck in the intermediate queue are cleaned up."""
queue = Queue('foo', connection=self.connection)
job = queue.enqueue(say_hello)

intermediate_queue = IntermediateQueue(queue.key, connection=self.connection)
self.connection.delete(intermediate_queue.key)

# If job execution fails after it's dequeued, job should be in the intermediate queue
# and it's status is still QUEUED
with patch.object(Worker, 'execute_job'):
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)

# If worker.execute_job() does nothing, job status should be `queued`
# even though it's not in the queue, but it should be in the intermediate queue
self.assertEqual(job.get_status(), 'queued')
self.assertFalse(job.id in queue.get_job_ids())
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])

self.assertIsNone(intermediate_queue.get_first_seen(job.id))
clean_intermediate_queue(worker, queue)
# After clean_intermediate_queue is called, the job should be marked as seen,
# but since it's been less than 1 minute, it should not be cleaned up
self.assertIsNotNone(intermediate_queue.get_first_seen(job.id))
self.assertFalse(intermediate_queue.should_be_cleaned_up(job.id))
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])

# If we set the first seen timestamp to 2 minutes ago, the job should be cleaned up
first_seen_key = intermediate_queue.get_first_seen_key(job.id)
two_minutes_ago = datetime.now(tz=timezone.utc) - timedelta(minutes=2)
self.connection.set(first_seen_key, two_minutes_ago.timestamp(), ex=10)

clean_intermediate_queue(worker, queue)
self.assertEqual(intermediate_queue.get_job_ids(), [])
self.assertEqual(job.get_status(), 'failed')

job = queue.enqueue(say_hello)
worker.work(burst=True)
self.assertEqual(intermediate_queue.get_job_ids(), [job.id])

# If job is gone, it should be immediately removed from the intermediate queue
job.delete()
clean_intermediate_queue(worker, queue)
self.assertEqual(intermediate_queue.get_job_ids(), [])
36 changes: 0 additions & 36 deletions tests/test_maintenance.py

This file was deleted.

0 comments on commit a7f19d9

Please sign in to comment.