Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PP-2130] celery backend config #2282

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

dbernstein
Copy link
Contributor

Description

This PR enables the result backend in celery. It should cause no change to the behavior of the system.

Motivation and Context

https://ebce-lyrasis.atlassian.net/browse/PP-2130

How Has This Been Tested?

I tested this update with my celery based axis 360 importer and reaper tasks which will follow after this one is merged.

Checklist

  • I have updated the documentation accordingly.
  • All new and existing tests passed.

@dbernstein dbernstein changed the title Pp 2130 celery backend config [PP-2130] celery backend config Feb 12, 2025
Copy link

codecov bot commented Feb 12, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 91.12%. Comparing base (20d9aaa) to head (3af816f).

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2282   +/-   ##
=======================================
  Coverage   91.12%   91.12%           
=======================================
  Files         363      363           
  Lines       41340    41342    +2     
  Branches     8848     8848           
=======================================
+ Hits        37670    37672    +2     
  Misses       2408     2408           
  Partials     1262     1262           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Member

@jonathangreen jonathangreen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things to take a look at here

@@ -29,6 +29,10 @@ def my_task(task: Task) -> None:
...
```

NB: It the task does not return a result, you must either call .get() or .ignore_result() on the result or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in the comment:

NB: It If the task does not return a result ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question about this as well, can we just configure results to be ignored by default instead of adding this parameter to the decorator everywhere, or would this have adverse consequences?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - this is the way to go - I ran into issues trying to set it up that way. I think that problem was that I was using the key for overriding it (which you pointed out below).

@@ -80,7 +80,8 @@ def mock_task(

def configure_app(
self,
broker_url: str = "redis://testtesttest:1234/1",
broker_url: str = "redis://testtesttest:1234/0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why update the broker_url here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it more consisten to have the database component of the test broker url mirror the production pattern. /0 is used for the broker url in production.

@@ -80,7 +80,8 @@ def mock_task(

def configure_app(
self,
broker_url: str = "redis://testtesttest:1234/1",
broker_url: str = "redis://testtesttest:1234/0",
result_backend: str = "redis://testtesttest:1234/0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't result_backend and broker_url have different URLs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they probably should - but they don't have to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix that.

return partial(
CeleryConfiguration,
broker_url="redis://test.com:6379/0",
result_backend="redis://test.com:6379/0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these be different URLs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes they should.

@@ -39,7 +40,11 @@ def celery_pydantic_config() -> CeleryConfiguration:

The config returned will then be used to configure the `celery_app` fixture.
"""
return CeleryConfiguration.model_construct(broker_url="memory://")
temp_dir = tempfile.mkdtemp(prefix="celery_test_backend")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is never cleaned up, doesn't this leak temp files on every run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - that's a good point.

temp_dir = tempfile.mkdtemp(prefix="celery_test_backend")

return CeleryConfiguration.model_construct(
broker_url="memory://", result_backend=f"file://{temp_dir}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The official Celery docs suggest using cache+memory:// as a backend in tests. Did you try that? It might be preferable to a temp file.

See:
https://github.com/celery/celery/blob/2b7c83db4c6af9504e29e4702f4fa299e7605ff8/celery/contrib/testing/app.py#L11-L20

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see that. That would be much better - I'll fix.

{
"redis": {
"url": self.config.url,
"backend": self.config.url,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does a backend key need to get set here? It doesn't seem like redis needs this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why I put that in there. I'll see if I can take it out.

README.md Outdated

- `PALACE_CELERY_BROKER_URL`: The URL of the broker to use for Celery. (**required**).
- for example:
```sh
export PALACE_CELERY_BROKER_URL="redis://localhost:6379/0"`

```
- `PALACE_CELERY_RESULT_BACKEND`: The url of the result backend to use for Celery. (**required**).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Looks like there are two spaces between the and result.

Suggested change
- `PALACE_CELERY_RESULT_BACKEND`: The url of the result backend to use for Celery. (**required**).
- `PALACE_CELERY_RESULT_BACKEND`: The url of the result backend to use for Celery. (**required**).

@@ -40,6 +40,6 @@ def run(self) -> None:
collection.delete()


@shared_task(queue=QueueNames.high, bind=True)
@shared_task(queue=QueueNames.high, bind=True, task_ignore_results=True)
Copy link
Member

@jonathangreen jonathangreen Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see task_ignore_results anywhere in the Celery documentation. Is that the parameter that we need to be passing here? From the docs it looks like ignore_result is what we want https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.ignore_result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah: that explains a lot. I'm going to try setting task_ignore_result=True in the config and remove all of what should have been ignore_result. Hopefully that will give us the default behavior we want with the backend enabled.

@dbernstein dbernstein force-pushed the PP-2130-celery-backend-config branch from 06362d6 to 5efd3a4 Compare February 13, 2025 20:47
@dbernstein
Copy link
Contributor Author

Ach - this is super frustrating:

With the default value set for task_ignore_result (ie False) the tests are passing. But if I try to set the task_ignore_result to True many of the tests for existing tasks are failing like this:

________________ ERROR at setup of test_collection_delete_task _________________
  [gw1] linux -- Python 3.10.16 /home/runner/work/circulation/circulation/.tox/py310-docker/bin/python
  
  request = <SubRequest 'celery_worker' for <Function test_collection_delete_task>>
  celery_app = <Celery celery.tests at 0x7f28dddfd2a0>
  celery_includes = ('palace.manager.celery.app',), celery_worker_pool = 'solo'
  celery_worker_parameters = {'shutdown_timeout': 30.0}
  
      @pytest.fixture()
      def celery_worker(request,
                        celery_app,
                        celery_includes,
                        celery_worker_pool,
                        celery_worker_parameters):
          # type: (Any, Celery, Sequence[str], str, Any) -> WorkController
          """Fixture: Start worker in a thread, stop it when the test returns."""
          from .testing import worker
      
          if not NO_WORKER:
              for module in celery_includes:
                  celery_app.loader.import_task_module(module)
  >           with worker.start_worker(celery_app,
                                       pool=celery_worker_pool,
                                       **celery_worker_parameters) as w:
  
  .tox/py[310](https://github.com/ThePalaceProject/circulation/actions/runs/13316788458/job/37192652016#step:7:311)-docker/lib/python3.10/site-packages/celery/contrib/pytest.py:207: 
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  /opt/hostedtoolcache/Python/3.10.16/x64/lib/python3.10/contextlib.py:135: in __enter__
      return next(self.gen)
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  
  app = <Celery celery.tests at 0x7f28dddfd2a0>, concurrency = 1, pool = 'solo'
  loglevel = 'error', logfile = None, perform_ping_check = True
  ping_task_timeout = 10.0, shutdown_timeout = 30.0, kwargs = {}
  worker = <Worker: gen3817@fv-az1712-345 (terminating)>
  ping = <@task: celery.ping of celery.tests at 0x7f28dddfd2a0>
  
      @contextmanager
      def start_worker(
          app,  # type: Celery
          concurrency=1,  # type: int
          pool='solo',  # type: str
          loglevel=WORKER_LOGLEVEL,  # type: Union[str, int]
          logfile=None,  # type: str
          perform_ping_check=True,  # type: bool
          ping_task_timeout=10.0,  # type: float
          shutdown_timeout=10.0,  # type: float
          **kwargs  # type: Any
      ):
          # type: (...) -> Iterable
          """Start embedded worker.
      
          Yields:
              celery.app.worker.Worker: worker instance.
          """
          test_worker_starting.send(sender=app)
      
          worker = None
          try:
              with _start_worker_thread(app,
                                        concurrency=concurrency,
                                        pool=pool,
                                        loglevel=loglevel,
                                        logfile=logfile,
                                        perform_ping_check=perform_ping_check,
                                        shutdown_timeout=shutdown_timeout,
                                        **kwargs) as worker:
                  if perform_ping_check:
                      from .tasks import ping
                      with allow_join_result():
  >                       assert ping.delay().get(timeout=ping_task_timeout) == 'pong'
  E                       AssertionError
  
  .tox/py310-docker/lib/python3.10/site-packages/celery/contrib/testing/worker.py:126: AssertionError

However if I set task_ignore_result to False and try to override the task (such as collection_delete using ignore_result=True it is also failing). So this PR is a bit stuck.

@jonathangreen
Copy link
Member

That is annoying. You could take a look at that testing repo I pointed to in one of the comments. There might be some insight there.

It's always a bit of work to get new components like this setup and testable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants