Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Database Errors When Running rqworker-pool #650

Open
GhalebKhaled opened this issue Feb 27, 2024 · 14 comments
Open

Database Errors When Running rqworker-pool #650

GhalebKhaled opened this issue Feb 27, 2024 · 14 comments

Comments

@GhalebKhaled
Copy link

I'm trying to switch from multiple workers for different queues into worker-pool, using the command below

./manage.py rqworker-pool main default secondary --num-workers 2 --worker-class rq.worker.HerokuWorker --settings settings.rq_settings

I'm getting database errors:

 File "/app/.heroku/python/lib/python3.11/site-packages/django/db/models/sql/compiler.py", line 1562, in execute_sql
    cursor.execute(sql, params)
  File "/app/.heroku/python/lib/python3.11/site-packages/sentry_sdk/integrations/django/__init__.py", line 641, in execute
    return real_execute(self, sql, params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/backends/utils.py", line 67, in execute
    return self._execute_with_wrappers(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/backends/utils.py", line 80, in _execute_with_wrappers
    return executor(sql, params, many, context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/backends/utils.py", line 84, in _execute
    with self.db.wrap_database_errors:
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/utils.py", line 91, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/backends/utils.py", line 89, in _execute
    return self.cursor.execute(sql, params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
django.db.utils.OperationalError: SSL SYSCALL error: EOF detected

Other database error appear if I used the SimpleWorker

File "/app/.heroku/python/lib/python3.11/site-packages/django/db/models/query.py", line 380, in __len__
    self._fetch_all()
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/models/query.py", line 1881, in _fetch_all
    self._result_cache = list(self._iterable_class(self))
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/models/query.py", line 91, in __iter__
    results = compiler.execute_sql(
              ^^^^^^^^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/models/sql/compiler.py", line 1560, in execute_sql
    cursor = self.connection.cursor()
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/utils/asyncio.py", line 26, in inner
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/backends/base/base.py", line 330, in cursor
    return self._cursor()
           ^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/backends/base/base.py", line 307, in _cursor
    with self.wrap_database_errors:
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/utils.py", line 91, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/backends/base/base.py", line 308, in _cursor
    return self._prepare_cursor(self.create_cursor(name))
                                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/utils/asyncio.py", line 26, in inner
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/app/.heroku/python/lib/python3.11/site-packages/django/db/backends/postgresql/base.py", line 330, in create_cursor
    cursor = self.connection.cursor()
             ^^^^^^^^^^^^^^^^^^^^^^^^
django.db.utils.InterfaceError: connection already closed

This happens on heroku (ubuntu os), database is heroku postgres 13.13
python 3.11.0
django==4.2.8
django-redis==5.4.0
rq==1.15.1
django-rq==2.10.1

RQ working fine as single worker for each queue and these errors happens only when switching to worker-pool, any thing I need to change to database configuration?

@jackkinsella
Copy link
Contributor

jackkinsella commented Mar 1, 2024

To add to this: worker-pool works fine as long as the worker count is only 1

E.g. This works

 python ./mysite/manage.py rqworker-pool default secondary --worker-class rq.worker.SimpleWorker --settings mysite.settings.rq_settings -num-workers 1

But this fails

 python ./mysite/manage.py rqworker-pool default secondary notifications --worker-class rq.worker.SimpleWorker --settings mysite.settings.rq_settings -num-workers 2

A relevant hint may be due to the operation of fork and psycopg2 - see https://virtualandy.wordpress.com/2019/09/04/a-fix-for-operationalerror-psycopg2-operationalerror-ssl-error-decryption-failed-or-bad-record-mac/

@jackkinsella
Copy link
Contributor

Another set of hints are found in this article

https://medium.com/@philamersune/fixing-ssl-error-decryption-failed-or-bad-record-mac-d668e71a5409

"The SSL error: decryption failed or bad record mac occurs either when the certificate is invalid or the message hash value has been tampered; in our case it’s because of the latter. Django creates a single database connection when it tries to query for the first time. Any subsequent calls to the database will use this existing connection until it is expired or closed, in which it will automatically create a new one the next time you query. The PostgreSQL engine in Django uses psycopg to talk to the database; according to the document it is level 2 thread safe. Unfortunately, the timeout() method is using multiprocessing module and therefore tampers the SSL MAC. There are different ways to fix this. We can either (1) use basic threads instead of spawning a new process or (2) use a new database connection in the timeout() method. We can also (3) scrap the timeout() method altogether and handle the async task properly via Celery."

@jackkinsella
Copy link
Contributor

jackkinsella commented Mar 1, 2024

I found a solution but it's unclear how to integrate into the set of libraries since

  1. It only affects Django/other ORMs (not pure RQ)
  2. Yet the relevant code to be changed seems to be in rq (which should remain agnostic as this)

The change would be to modify the start-up code for each new process - i.e. worker_pool.run_worker, which is the target of Process

https://github.com/rq/rq/blob/3ad86083c33ec28b81a07f94dafdcf1cd56429ea/rq/worker_pool.py#L243

The change is as follows - inserting the following lines into the position above (i.e. into the start of the run_worker function)

from django.db import connections
# another complication arises if someone is using a DB alias that is not default... I guess this would need to be configurable
connections["default"].close()

@jackkinsella
Copy link
Contributor

Perhaps one API design solution would be to provide a way for users to override the WorkerPool class?

Another could be to provide a hook on_worker_pool_fork that users can override.

@selwin
Copy link
Collaborator

selwin commented Mar 1, 2024

Ah ok. Django-RQ's rqworker management command closes all DB connections before running worker.work(), perhaps we'll need to do something similar here.

I can think of two options:

  1. RQ's worker-pool already accepts --worker-class argument. Django RQ can pass in its own Worker class that closes all DB connections prior to working. https://github.com/rq/rq/blob/master/rq/worker_pool.py#L42
  2. Django RQ creates its own WorkerPool class that closes all DB connections before running the workers.

I can provide hooks to make it easier, but it'd be better to have working proof of concepts so I know which hooks to provide. I was thinking we could do this via Worker.before_run().

@jackkinsella
Copy link
Contributor

I was thinking about this a bit more and wanted to share more information:

  1. I wanted to confirm that the example I gave yesterday worked when I tried in a Heroku server with many rq tasks. When the DB connection is teared down in the new process (i.e. post fork), it seems Django creates a new one.
  2. What I don't know is whether this has any impact "upstream" to the DB connections in the main process. This would only matter if ASYNC=False, so might be treated as an edge case to figure out at a later stage.

As for specific code, I ran something like this. I'm sure there are better ways to reduce duplication and make better use of sub-classing etc. - but it's a POC so you understand the direction:

from multiprocessing import Process
from typing import Optional
from uuid import uuid4

from django.db import connections
from django.db.utils import load_backend
from rq.worker_pool import WorkerPool, run_worker


class Psycopg2CompatibleWorkerPool(WorkerPool):
    def start_worker(
        self,
        count: Optional[int] = None,
        burst: bool = True,
        _sleep: float = 0,
        logging_level: str = "INFO",
    ):
        """
        Starts a worker and adds the data to worker_datas.
        * sleep: waits for X seconds before creating worker, for testing purposes
        """
        name = uuid4().hex
        process = Process(
            target=run_worker_with_new_db_connection,
            args=(name, self._queue_names, self._connection_class, self._pool_class, self._pool_kwargs),
            kwargs={
                '_sleep': _sleep,
                'burst': burst,
                'logging_level': logging_level,
                'worker_class': self.worker_class,
                'job_class': self.job_class,
                'serializer': self.serializer,
            },
            name=f'Worker {name} (WorkerPool {self.name})',
        )
        process.start()
        worker_data = WorkerData(name=name, pid=process.pid, process=process)  # type: ignore
        self.worker_dict[name] = worker_data
        self.log.debug('Spawned worker: %s with PID %d', name, process.pid)


def run_worker_with_new_db_connection(*args, **kwargs):
    alias = "default"
    connections[alias].close()
    run_worker(*args, **kwargs)

@selwin
Copy link
Collaborator

selwin commented Mar 3, 2024

@jackkinsella take a look at https://github.com/rq/rq/pull/2052/files . I think having this PR merged into RQ would provide a reasonable place for django-rq and other frameworks to hook into.

@jackkinsella
Copy link
Contributor

@selwin Would the idea be that django_rq (or individual user code) could overrideget_worker_process and add their own implementation without needing to mess with the larger method?

@selwin
Copy link
Collaborator

selwin commented Mar 3, 2024 via email

@jackkinsella
Copy link
Contributor

So what would be some pseudo code if someone using the library wanted to override that function? I'm wondering if we need to provide an easy hook - e.g. some way for people to modify their Django settings file to specify a location for the override function?

@jackkinsella
Copy link
Contributor

Well actually, I guess the Django RQ library would be the ideal place to override it. All Django users will have this problem for all DBs I believe. This is due to how SSL works after forking

@selwin
Copy link
Collaborator

selwin commented Mar 9, 2024

@jackkinsella I released RQ 1.16.1 with worker_pool.get_worker_process(). It would be great if someone can create a PR implementing this in django-RQ.

@jackkinsella
Copy link
Contributor

Sure, I can take this on!

@jackkinsella
Copy link
Contributor

Added feedback here instead https://github.com/rq/django-rq/pull/655/files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants