Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add scheduler pid info #570

Merged
merged 21 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions django_rq/templates/django_rq/stats.html
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<th>Host</th>
<th>Port</th>
<th>DB</th>
<th>Scheduler PID</th>
</tr>
</thead>
<tbody>
Expand Down Expand Up @@ -84,6 +85,7 @@
<td>{{ queue.connection_kwargs.host }}</td>
<td>{{ queue.connection_kwargs.port }}</td>
<td>{{ queue.connection_kwargs.db }}</td>
<td>{{ queue.scheduler_pid|default_if_none:"Inactive" }}</td>
</tr>
{% endfor %}
</tbody>
Expand Down
74 changes: 73 additions & 1 deletion django_rq/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from django_rq import thread_queue
from django_rq.templatetags.django_rq import force_escape, to_localtime
from django_rq.tests.fixtures import DummyJob, DummyQueue, DummyWorker
from django_rq.utils import get_jobs, get_statistics
from django_rq.utils import get_jobs, get_statistics, get_scheduler_pid
from django_rq.workers import get_worker, get_worker_class

try:
Expand Down Expand Up @@ -808,6 +808,78 @@ def test_force_escape_regular_string(self):
self.assertEqual(escaped_string, expected)



class SchedulerPIDTest(TestCase):

@skipIf(RQ_SCHEDULER_INSTALLED is False, 'RQ Scheduler not installed')
def test_scheduler_scheduler_pid_active(self):
queues = [{
'connection_config': {
'DB': 0,
'HOST': 'localhost',
'PORT': 6379,
},
'name': 'scheduler_scheduler_active'
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
get_scheduler('scheduler_scheduler_active')
self.assertIsNotNone(get_scheduler_pid(queue=None)) # No queue object needed

@skipIf(RQ_SCHEDULER_INSTALLED is False, 'RQ Scheduler not installed')
def test_scheduler_scheduler_pid_inactive(self):
queues = [{
'connection_config': {
'DB': 0,
'HOST': 'localhost',
'PORT': 6379,
},
'name': 'scheduler_scheduler_inactive'
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
scheduler = get_scheduler('scheduler_scheduler_inactive')
scheduler.register_death() # will mark the scheduler as death so get_scheduler_pid will return None
self.assertIsNone(get_scheduler_pid(queue=None)) # No queue object needed

@skipIf(RQ_SCHEDULER_INSTALLED is True, 'RQ Scheduler installed (no worker--with-scheduler)')
def test_worker_scheduler_pid_active(self):
'''The worker works as scheduler too if RQ Scheduler not installed, and the pid scheduler_pid is correct'''
queues = [{
'connection_config': {
'DB': 0,
'HOST': 'localhost',
'PORT': 6379,
},
'name': 'worker_scheduler_active'
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
worker = get_worker('worker_scheduler_active', name=uuid4().hex)
worker.register_birth()
worker.work(with_scheduler=True) # force the worker to acquire a scheduler lock
self.assertIsNotNone(get_scheduler_pid(worker.queues[0]))
worker.register_death()

@skipIf(RQ_SCHEDULER_INSTALLED is True, 'RQ Scheduler installed (no worker--with-scheduler)')
def test_worker_scheduler_pid_inactive(self):
'''The worker works as scheduler too if RQ Scheduler not installed, and the pid scheduler_pid is correct'''
queues = [{
'connection_config': {
'DB': 0,
'HOST': 'localhost',
'PORT': 6379,
},
'name': 'worker_scheduler_inactive'
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
worker = get_worker('worker_scheduler_inactive', name=uuid4().hex)
worker.register_birth()
worker.work(with_scheduler=False) # worker will not acquire lock, scheduler_pid should return None
self.assertIsNone(get_scheduler_pid(worker.queues[0]))
worker.register_death()

class UtilsTest(TestCase):
def test_get_statistics(self):
"""get_statistics() returns the right number of workers"""
Expand Down
35 changes: 34 additions & 1 deletion django_rq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,41 @@
from rq.worker import Worker
from rq.worker_registration import clean_worker_registry

from .queues import get_connection, get_queue_by_index

from .queues import get_connection, get_queue_by_index, get_scheduler
from .settings import QUEUES_LIST
from .templatetags.django_rq import to_localtime
from django.core.exceptions import ImproperlyConfigured

def get_scheduler_pid(queue):
'''Checks whether there's a scheduler-lock on a particular queue, and returns the PID.
It works by first checking if there's an RQ-Scheduler active.
If not, it checks the RQ's RQScheduler for a scheduler lock in the desired queue
Note: result might have some delay (1-15 minutes) but it helps visualizing whether the setup is working correcly
'''
try:
# first try to use rq-scheduler
scheduler = get_scheduler() # should fail if rq_scheduler not present
# currently, scheduler_lock_key is not used but, just in case, try
lock_key = scheduler.scheduler_lock_key
with scheduler.connection.pipeline() as p:
if _ := p.get(lock_key):
return scheduler.pid
else:
for key in p.keys(f"{scheduler.redis_scheduler_namespace_prefix}*"):
if not p.hexists(key, 'death'):
return scheduler.pid
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reasons the lock_key was not always there.
So, the extra check for the scheduler instance.

except ImproperlyConfigured:
if not queue:
raise ValueError("queue argument not defined for rq's Scheduler")
from rq.scheduler import RQScheduler
# When a scheduler acquires a lock it adds an expiring key: (e.g: rq:scheduler-lock:<queue.name>)
# If the key exists
if pid := queue.connection.get(RQScheduler.get_locking_key(queue.name)):
return pid.decode()
except Exception as e:
pass # Return None
return None


def get_statistics(run_maintenance_tasks=False):
Expand Down Expand Up @@ -47,6 +79,7 @@ def get_statistics(run_maintenance_tasks=False):
'oldest_job_timestamp': oldest_job_timestamp,
'index': index,
'connection_kwargs': connection_kwargs,
'scheduler_pid': get_scheduler_pid(queue),
}

connection = get_connection(queue.name)
Expand Down