Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add scheduler pid info #570

Merged
merged 21 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion django_rq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
class Queue(models.Model):
"""Placeholder model with no database table, but with django admin page
and contenttype permission"""

class Meta:
managed = False # not in Django's database
default_permissions = ()
Expand Down
6 changes: 6 additions & 0 deletions django_rq/templates/django_rq/stats.html
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
<th>Host</th>
<th>Port</th>
<th>DB</th>
{% if queue.scheduler_pid is not False %}
<th>Scheduler PID</th>
{% endif%}
</tr>
</thead>
<tbody>
Expand Down Expand Up @@ -84,6 +87,9 @@
<td>{{ queue.connection_kwargs.host }}</td>
<td>{{ queue.connection_kwargs.port }}</td>
<td>{{ queue.connection_kwargs.db }}</td>
{% if queue.scheduler_pid is not False %}
<td>{{ queue.scheduler_pid|default_if_none:"Inactive" }}</td>
{% endif %}
</tr>
{% endfor %}
</tbody>
Expand Down
24 changes: 24 additions & 0 deletions django_rq/tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,30 @@
'PORT': 6379,
'DB': 0,
},
'scheduler_scheduler_active_test': {
'HOST': REDIS_HOST,
'PORT': 6379,
'DB': 0,
'ASYNC': False,
},
'scheduler_scheduler_inactive_test': {
'HOST': REDIS_HOST,
'PORT': 6379,
'DB': 0,
'ASYNC': False,
},
'worker_scheduler_active_test': {
'HOST': REDIS_HOST,
'PORT': 6379,
'DB': 0,
'ASYNC': False,
},
'worker_scheduler_inactive_test': {
'HOST': REDIS_HOST,
'PORT': 6379,
'DB': 0,
'ASYNC': False,
},
'django-redis': {
'USE_REDIS_CACHE': 'default',
},
Expand Down
82 changes: 81 additions & 1 deletion django_rq/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from django_rq import thread_queue
from django_rq.templatetags.django_rq import force_escape, to_localtime
from django_rq.tests.fixtures import DummyJob, DummyQueue, DummyWorker
from django_rq.utils import get_jobs, get_statistics
from django_rq.utils import get_jobs, get_statistics, get_scheduler_pid
from django_rq.workers import get_worker, get_worker_class

try:
Expand Down Expand Up @@ -808,6 +808,86 @@ def test_force_escape_regular_string(self):
self.assertEqual(escaped_string, expected)


class SchedulerPIDTest(TestCase):
@skipIf(RQ_SCHEDULER_INSTALLED is False, 'RQ Scheduler not installed')
def test_scheduler_scheduler_pid_active(self):
test_queue = 'scheduler_scheduler_active_test'
queues = [{
'connection_config': {
'DB': 0,
'HOST': 'localhost',
'PORT': 6379,
},
'name': test_queue,
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
scheduler = get_scheduler(test_queue)
scheduler.register_birth()
self.assertIs(get_scheduler_pid(get_queue(scheduler.queue_name)), False)
scheduler.register_death()

@skipIf(RQ_SCHEDULER_INSTALLED is False, 'RQ Scheduler not installed')
def test_scheduler_scheduler_pid_inactive(self):
test_queue = 'scheduler_scheduler_inactive_test'
queues = [{
'connection_config': {
'DB': 0,
'HOST': 'localhost',
'PORT': 6379,
},
'name': test_queue,
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
connection = get_connection(test_queue)
connection.flushall() # flush is needed to isolate from other tests
scheduler = get_scheduler(test_queue)
scheduler.remove_lock()
scheduler.register_death() # will mark the scheduler as death so get_scheduler_pid will return None
self.assertIs(get_scheduler_pid(get_queue(scheduler.queue_name)), False)

@skipIf(RQ_SCHEDULER_INSTALLED is True, 'RQ Scheduler installed (no worker--with-scheduler)')
def test_worker_scheduler_pid_active(self):
'''The worker works as scheduler too if RQ Scheduler not installed, and the pid scheduler_pid is correct'''
test_queue = 'worker_scheduler_active_test'
queues = [{
'connection_config': {
'DB': 0,
'HOST': 'localhost',
'PORT': 6379,
},
'name': test_queue,
}]
with patch('rq.scheduler.RQScheduler.release_locks') as mock_release_locks:
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
queue = get_queue(test_queue)
worker = get_worker(test_queue, name=uuid4().hex)
worker.work(with_scheduler=True, burst=True) # force the worker to acquire a scheduler lock
pid = get_scheduler_pid(queue)
self.assertIsNotNone(pid)
self.assertIsNot(pid, False)
self.assertIsInstance(pid, int)

@skipIf(RQ_SCHEDULER_INSTALLED is True, 'RQ Scheduler installed (no worker--with-scheduler)')
def test_worker_scheduler_pid_inactive(self):
'''The worker works as scheduler too if RQ Scheduler not installed, and the pid scheduler_pid is correct'''
test_queue = 'worker_scheduler_inactive_test'
queues = [{
'connection_config': {
'DB': 0,
'HOST': 'localhost',
'PORT': 6379,
},
'name': test_queue,
}]
with patch('django_rq.utils.QUEUES_LIST',
new_callable=PropertyMock(return_value=queues)):
worker = get_worker(test_queue, name=uuid4().hex)
worker.work(with_scheduler=False, burst=True) # worker will not acquire lock, scheduler_pid should return None
self.assertIsNone(get_scheduler_pid(worker.queues[0]))

class UtilsTest(TestCase):
def test_get_statistics(self):
"""get_statistics() returns the right number of workers"""
Expand Down
26 changes: 25 additions & 1 deletion django_rq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,32 @@
from rq.worker import Worker
from rq.worker_registration import clean_worker_registry

from .queues import get_connection, get_queue_by_index

from .queues import get_connection, get_queue_by_index, get_scheduler
from .settings import QUEUES_LIST
from .templatetags.django_rq import to_localtime
from django.core.exceptions import ImproperlyConfigured

def get_scheduler_pid(queue):
'''Checks whether there's a scheduler-lock on a particular queue, and returns the PID.
It Only works with RQ's Built-in RQScheduler.
When RQ-Scheduler is available returns False
If not, it checks the RQ's RQScheduler for a scheduler lock in the desired queue
Note: result might have some delay (1-15 minutes) but it helps visualizing whether the setup is working correcly
'''
try:
# first try get the rq-scheduler
scheduler = get_scheduler(queue.name) # should fail if rq_scheduler not present
return False # Not possible to give useful information without creating a performance issue (redis.keys())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause get_scheduler_pid to always return False if rq_scheduler is installed, even though the built in scheduler is used.

I think we should just skip the check for external scheduler altogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, ok. In my tests, running both --with-scheduler and rq-scheduler doesn't end up well.
By returning False, I'm basically disabling the feature, since it won't be shown in the dashboard.

After Some digging I found that, for some reason, rq and rq-scheduler organize scheduling and enqueuing of scheduled jobs in parallel ways, namely incompatible.

Please correct me if I'm wrong: Basically, jobs scheduled using Scheduler.enqueue_at will never be picked up by a worker --with-scheduler (different redis keys), and also the opposite is true. I wouldn't understand anyone having both running (and if they do, the worker will be worker only).

Recently, @cunla's django-rq-scheduler (>2022.12.1) transitioned to --with-scheduler.

My point is: showing that the Scheduler is active (worker --with-scheduler) and then scheduling jobs using rq-scheduler enqueue_at will cause stale jobs that never run and are never seen by django-rq.

What I propose is your original idea but only working if the rq-scheduler is not to be found, otherwise, the information would be misleading, to say the least.

Let me know

Copy link
Contributor Author

@gabriels1234 gabriels1234 Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I propose is your original idea but only working if the rq-scheduler is not to be found, otherwise, the information would be misleading, to say the least.

@selwin as I mentioned, I prefer not to activate the feature for rq-scheduler, because it would be misleading.
I think the PR is ready to move forward. (At least to ship it as a half-feature)

However, I'm happy to make it according to your opinion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's drop rq-scheduler altogether as it will only make things confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@selwin great! So the PR is good as it is. do you need to see a screenshot?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please, a screenshot would be good just to make sure that it works :)

except ImproperlyConfigured:
from rq.scheduler import RQScheduler
# When a scheduler acquires a lock it adds an expiring key: (e.g: rq:scheduler-lock:<queue.name>)
#TODO: (RQ>= 1.13) return queue.scheduler_pid
pid = queue.connection.get(RQScheduler.get_locking_key(queue.name))
return int(pid.decode()) if pid is not None else None
except Exception as e:
pass # Return None
return None


def get_statistics(run_maintenance_tasks=False):
Expand Down Expand Up @@ -47,6 +70,7 @@ def get_statistics(run_maintenance_tasks=False):
'oldest_job_timestamp': oldest_job_timestamp,
'index': index,
'connection_kwargs': connection_kwargs,
'scheduler_pid': get_scheduler_pid(queue),
}

connection = get_connection(queue.name)
Expand Down