diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index 0cd7fb0760..2c70f973ab 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -5,6 +5,7 @@ from rq import get_current_job from rq.job import JobStatus from rq.timeouts import JobTimeoutException +from rq.exceptions import NoSuchJobError from redash import models, redis_connection, settings from redash.query_runner import InterruptException @@ -43,24 +44,22 @@ def enqueue_query( job_id = pipe.get(_job_lock_id(query_hash, data_source.id)) if job_id: logger.info("[%s] Found existing job: %s", query_hash, job_id) - job_exists = Job.exists(job_id) job_complete = None - if job_exists: + try: job = Job.fetch(job_id) + job_exists = True status = job.get_status() job_complete = status in [JobStatus.FINISHED, JobStatus.FAILED] if job_complete: - logger.info( - "[%s] job found is complete (%s), removing lock", - query_hash, - status, - ) - else: - logger.info("[%s] job found has expired, removing lock", query_hash) + message = "job found is complete (%s)" % status + except NoSuchJobError: + message = "job found has expired" + job_exists = False if job_complete or not job_exists: + logger.info("[%s] %s, removing lock", query_hash, message) redis_connection.delete(_job_lock_id(query_hash, data_source.id)) job = None diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index 03c3da9e63..063e7c2780 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -4,6 +4,7 @@ from mock import patch, Mock from rq import Connection +from rq.exceptions import NoSuchJobError from tests import BaseTestCase from redash import redis_connection, rq_redis_connection, models @@ -33,11 +34,10 @@ def create_job(*args, **kwargs): return Job(connection=rq_redis_connection) -@patch("redash.tasks.queries.execution.Job.exists", return_value=True) @patch("redash.tasks.queries.execution.Job.fetch", side_effect=fetch_job) @patch("redash.tasks.queries.execution.Queue.enqueue", side_effect=create_job) class TestEnqueueTask(BaseTestCase): - def test_multiple_enqueue_of_same_query(self, enqueue, _, __): + def test_multiple_enqueue_of_same_query(self, enqueue, _): query = self.factory.create_query() with Connection(rq_redis_connection): @@ -68,7 +68,7 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _, __): self.assertEqual(1, enqueue.call_count) - def test_multiple_enqueue_of_expired_job(self, enqueue, _, exists): + def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job): query = self.factory.create_query() with Connection(rq_redis_connection): @@ -81,7 +81,8 @@ def test_multiple_enqueue_of_expired_job(self, enqueue, _, exists): {"Username": "Arik", "Query ID": query.id}, ) - exists.return_value = False + # "expire" the previous job + fetch_job.side_effect = NoSuchJobError enqueue_query( query.query_text, @@ -95,7 +96,7 @@ def test_multiple_enqueue_of_expired_job(self, enqueue, _, exists): self.assertEqual(2, enqueue.call_count) @patch("redash.settings.dynamic_settings.query_time_limit", return_value=60) - def test_limits_query_time(self, _, enqueue, __, ___): + def test_limits_query_time(self, _, enqueue, __): query = self.factory.create_query() with Connection(rq_redis_connection): @@ -111,7 +112,7 @@ def test_limits_query_time(self, _, enqueue, __, ___): _, kwargs = enqueue.call_args self.assertEqual(60, kwargs.get("job_timeout")) - def test_multiple_enqueue_of_different_query(self, enqueue, _, __): + def test_multiple_enqueue_of_different_query(self, enqueue, _): query = self.factory.create_query() with Connection(rq_redis_connection):