Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker pool handles spawned process #655

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ jobs:
build:
runs-on: ubuntu-latest
name: Python${{ matrix.python-version }}/Django${{ matrix.django-version }}

services:
postgres:
image: postgres:15
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: django_rq_test_db
ports:
- 5432:5432

strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
Expand All @@ -34,8 +45,8 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install django==${{ matrix.django-version }}
pip install redis django-redis rq sentry-sdk rq-scheduler
pip install redis django-redis rq sentry-sdk rq-scheduler psycopg2-binary

- name: Run Test
run: |
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=.
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. --noinput
12 changes: 8 additions & 4 deletions django_rq/management/commands/rqworker-pool.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import multiprocessing as mp
import os
import sys

from rq.serializers import resolve_serializer
from rq.worker_pool import WorkerPool
from rq.logutils import setup_loghandlers
from rq.serializers import resolve_serializer

from django.core.management.base import BaseCommand

from ...jobs import get_job_class
from ...utils import configure_sentry
from ...utils import configure_sentry, reset_db_connections
from ...queues import get_queues
from ...workers import get_worker_class
from ...worker_pool import DjangoWorkerPool


class Command(BaseCommand):
Expand Down Expand Up @@ -89,12 +90,15 @@ def handle(self, *args, **options):
worker_class = get_worker_class(options.get('worker_class', None))
serializer = resolve_serializer(options['serializer'])

pool = WorkerPool(
pool = DjangoWorkerPool(
queues=queues,
connection=queues[0].connection,
num_workers=options['num_workers'],
serializer=serializer,
worker_class=worker_class,
job_class=job_class,
)
# Close any opened DB connection before any fork
reset_db_connections()
mp.set_start_method('fork', force=True)
pool.start(burst=options.get('burst', False), logging_level=logging_level)
11 changes: 9 additions & 2 deletions django_rq/tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@

DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'django_rq_db',
'USER': 'postgres',
'PASSWORD': 'postgres',
'HOST': 'localhost',
'PORT': '5432',
'TEST': {
'NAME': 'django_rq_test_db',
}
},
}

Expand Down
17 changes: 16 additions & 1 deletion django_rq/tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import datetime
import multiprocessing
import sys
import time
from unittest import skipIf, mock
from unittest.mock import patch, PropertyMock, MagicMock
Expand Down Expand Up @@ -37,6 +38,8 @@
from django_rq.utils import get_jobs, get_statistics, get_scheduler_pid
from django_rq.workers import get_worker, get_worker_class

from .utils import query_user

try:
from rq_scheduler import Scheduler
from ..queues import get_scheduler
Expand Down Expand Up @@ -303,6 +306,18 @@ def test_pass_queue_via_commandline_args(self):
self.assertTrue(job['job'].is_finished)
self.assertIn(job['job'].id, job['finished_job_registry'].get_job_ids())

def test_rqworker_pool_process_start_method(self) -> None:
for start_method in ['spawn', 'fork']:
with mock.patch.object(multiprocessing, 'get_start_method', return_value=start_method):
queue_name = 'django_rq_test'
queue = get_queue(queue_name)
job = queue.enqueue(query_user)
finished_job_registry = FinishedJobRegistry(queue.name, queue.connection)
call_command('rqworker-pool', queue_name, burst=True)

self.assertTrue(job.is_finished)
self.assertIn(job.id, finished_job_registry.get_job_ids())

def test_configure_sentry(self):
rqworker.configure_sentry('https://1@sentry.io/1')
self.mock_sdk.init.assert_called_once_with(
Expand Down
14 changes: 13 additions & 1 deletion django_rq/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import Optional

from django_rq.queues import get_connection, get_queue_by_index

from django.contrib.auth.models import User


def get_queue_index(name='default'):
def get_queue_index(name: str = 'default') -> int:
"""
Returns the position of Queue for the named queue in QUEUES_LIST
"""
Expand All @@ -17,3 +21,11 @@ def get_queue_index(name='default'):
queue_index = i
break
return queue_index


def query_user() -> Optional[User]:
try:
return User.objects.first()
except Exception as e:
print('Exception caught when querying user: ', e)
raise e
38 changes: 38 additions & 0 deletions django_rq/worker_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import django
from multiprocessing import Process, get_start_method
from typing import Any

from rq.worker_pool import WorkerPool, run_worker


class DjangoWorkerPool(WorkerPool):
def get_worker_process(
self,
name: str,
burst: bool,
_sleep: float = 0,
logging_level: str = "INFO",
) -> Process:
"""Returns the worker process"""
return Process(
target=run_django_worker,
args=(name, self._queue_names, self._connection_class, self._pool_class, self._pool_kwargs),
kwargs={
'_sleep': _sleep,
'burst': burst,
'logging_level': logging_level,
'worker_class': self.worker_class,
'job_class': self.job_class,
'serializer': self.serializer,
},
name=f'Worker {name} (WorkerPool {self.name})',
)


def run_django_worker(*args: Any, **kwargs: Any) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW The version of this function we're using in production was.

def run_django_worker(*args: Any, **kwargs: Any) -> None:
    alias = "default"

    try:
        connections[alias].close()
    except Exception:
        logger.error(f"Worker could not get a connection to the database with alias {alias}")

    run_worker(*args, **kwargs)

The exception catching and logging was transient and didn't stop us from using a 16 worker queue to process a few 100k events per day

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The django.setup() solution is likely better (but I don't understand it so can't offer feedback)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked production (Heroku - unix) and get_start_method() was "fork". I know that simply executing run_worker() will not work in that environment due to the psycog issues with the DB connection from pre-fork.

(Maybe this explains the failing CI test?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I just realized that db connections are also closed pre-fork in the rqworker management command.

# Close any opened DB connection before any fork
reset_db_connections()

I called django.setup() because the spawned process doesn't load the installed apps at all. This works fine when I manually tested it, but fails in unit test. Probably its because django.setup() is not setting up the project in testing environment which leads to the process setting up its own DB instance.

I'm currently using this django code for parallel test runner as a reference. I'll try to update the PR in a day or two.
https://github.com/django/django/blob/53719d6b5b745dd99b1ab9315afb242f706ebbf1/django/test/runner.py#L424-L432

# multiprocessing library default process start method may be
# `spawn` or `fork` depending on the host OS
if get_start_method() == 'spawn':
django.setup()

run_worker(*args, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to @jackkinsella's comment, we need to use our own run_worker command that calls reset_db_connections before running the original run_worker command:

def reset_db_connections():