Skip to content

Conversation

@Kytha
Copy link
Contributor

@Kytha Kytha commented Aug 30, 2024

An Airflow scheduler performance test highlighted a very hot piece of code in the celery executor when using a database results backend. This code seemed to be doing redundant work. Below is a flamegraph of the airflow scheduler process captured by a statistical profiler (py-spy) during a period of heavy load (4000 tasks required scheduling).

Screenshot 2024-08-23 at 2 02 00 AM

During this 1 minute profiler session, the scheduler spent 42% of it's time on nested within this line of code. The reason this code is so hot is that when using celery with a database results backend, celery will not pool database connections (unless process is forked) and thus a new db connection must be established for each task in the loop. This is very expensive and scales with number of tasks. We can see from flame graph most of the time is spent creating a database connection.

The solution put forth in this PR is to remove this operation entirely from the _process_tasks function. This is based on the justification that immediately after the celery executor processes tasks, the sync method of the celery executor will be called by its parent base executor to sync task state. In my view, this renders this line of code redundant.

When calling sync, the celery executor makes use of batch fetching and thus is more optimized.

Some additional deployment details

Airflow Version: 2.9.2
Python Version: 3.11
Platform: Amazon MWAA
Celery results backend: PostgreSQL
Celery broker: Amazon SQS

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg
Copy link

boring-cyborg bot commented Aug 30, 2024

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, LGTM

@eladkal
Copy link
Contributor

eladkal commented Aug 30, 2024

Nice analysis. Though I think we need to check why this code is there to begin with, there may be a reason we are missing.

Also, this fix is based on the assumption:

immediately after the celery executor processes tasks, the sync method of the celery executor will be called by its parent base executor to sync task state

If we accept it I think we need a test to make sure this stay that way and won't be changed with future PRs

@potiuk
Copy link
Member

potiuk commented Sep 1, 2024

It's an interesting one. And yeah - nice analysis @Kytha !

Looks like the original code was supposed to handle "quick" tasks - but It did not take into account that - apparently - in order to check if result is available, celery will indeed make a db call for EVERY async result - no matter what state the task is - whether it is submitted, running, or completec.. That's a very nasty side-effect if that's confirmed.

But I'd be rather suprised to see such "huge" inefficiency of this check - I imagine it would have been found already if that was a "general" problem.

@Kytha - question - was your pyspy testing done with "real" tasks or ones that were doing nothing (i.e. pass-only tasks or similar)?

One possible reason why you might see that bad numbers beccause you have a very quick (pass) celery tasks and some of them manage to complete very quickly (but I do not know celery internals).

It would be rather inefficient way of checking if task completed by calling a DB for every single task. I Intuitively would imagine that celery should not call the DB if task did not complete yet (but maybe this is how it is implemented and there is no other side-communication to check it). I'd imagine you need to retrieve the result from the DB when task completed, because of persistency and recovery, but if the task is still "running" - I hardly see the need to reach out to the result backend (I imagine celery could check in the queue if the task was picked up and acknowledged way faster - without reaching out to the backend).

One reason why you would see so big "percentage" of time there is that you made the tests with "pass-only" celery tasks and quite a big number of those tasks manage to complete before the check happens (but this is just a hypothesis).

Can you please elaborate on that ?

@potiuk
Copy link
Member

potiuk commented Sep 1, 2024

Also another question:

https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/configurations-ref.html#result-backend-sqlalchemy-engine-options -> there is also a possible pool configuration, so celery backend can use pooling. As you mentioned NullPool is used only when process is not forked. But.... what does it mean that process is not forked and is the case in case of task submitssion and checking result?

Again - I am not that deep into celery interface - but I think the code you pointed at is called only on the side of worker (i.e. when worker wants to write status to the result backend) but it's not the same code that is used on a client side when the tasks are submitted and queried for status. Maybe there is another reason why in your case pooling connections are not used.

Also - another question - are you using pgbouncer for your result backend? Because, I believe this might be the actual root cause of the overhead. It's indeed quite slow to open and close a connection to a postgres database directly - because it needs to fork a new process and reserve some memory - but if you have pgbouncer on a local nettwork, opening and closing a connection to the bgbounder instance should be super-quick - because pgbouncer does "real" DB connection pooling in this case. And PGBouncer is absolutely recommended for all "serious" Airlfow installations on Postgres.

So even if "NullPool" is used in this case - that should be fine because in fact NullPool is even recommended if you have PGBouncer - for single-threaded programs: https://dba.stackexchange.com/questions/36828/how-to-best-use-connection-pooling-in-sqlalchemy-for-pgbouncer-transaction-level/118105#118105 and many other places.

I am not against this change, however I think we need to understand better where the "huge" inefficiency you noticed comes from - I'd really find it quite strange to see it after so many years of Airlfow using celery in multiple - even huge - installation for it to be unnoticed, so I suspect what you see is a result of some environmental setup that simply "boosts" the hotness of that code artifficially.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting changes until we understand better where the "hotness" of that testing comes from - becasue IMHO, it should not be "that" hot.

@Kytha
Copy link
Contributor Author

Kytha commented Sep 4, 2024

Nice analysis. Though I think we need to check why this code is there to begin with, there may be a reason we are missing.

Also, this fix is based on the assumption:

immediately after the celery executor processes tasks, the sync method of the celery executor will be called by its parent base executor to sync task state

If we accept it I think we need a test to make sure this stay that way and won't be changed with future PRs

My search into why this was added didn't yield much insight. It was added in #10949, but never directly addressed or explained beyond the original comment that was left. The PR generally seems to be separate changes related to task adoption.

I also agree, I can add a test to prevent regression of this assumption.

@Kytha
Copy link
Contributor Author

Kytha commented Sep 4, 2024

@potiuk Thanks for in-depth review. I'll try to clarify a few things here.

For my test I was using 'dummy' tasks that would sleep for 1 minute each (4000 DAGs, 2 parallel tasks each, scheduled every 5 minutes).

Also, the profiler results and stats attached is only a 1-minute sample from the runtime. I did take many samples and saw a similar trend, however mileage will definitely vary based on speed of db connection, number of concurrent tasks, etc.

I hardly see the need to reach out to the result backend (I imagine celery could check in the queue if the task was picked up and acknowledged way faster - without reaching out to the backend).

I am not sure if I follow the logic here. From my understanding, the results backend is the mechanism the executor uses to determine the state of a task. Therefore, it would be a requirement to query the results backend to get the status. I think the problem here is that this is not done in batch (like it is during the normal executor sync). It seems to me this was a simple oversight when adding this seemingly innocent line, which under-the-hood requires to 2 db connections to backend for result.state and getattr(result, "info", None) due to lack of celery pooling.

@Kytha
Copy link
Contributor Author

Kytha commented Sep 4, 2024

here is also a possible pool configuration, so celery backend can use pooling. As you mentioned NullPool is used only when process is not forked. But.... what does it mean that process is not forked and is the case in case of task submitssion and checking result?

Yeah, so unfortunately celery will override any pooling config provided here if the process is not forked. Meaning that the python process needs to have made a call to os.fork() prior to calling get_engine. (this is the celery callback that will execute once process is forked). In the case of the scheduler, it is not a forked process. For workers, this would be the case (for default airflow config). This means for the scheduler, celery is creating a new engine every time get_engine is called.

@Kytha
Copy link
Contributor Author

Kytha commented Sep 4, 2024

are you using pgbouncer for your result backend? Because, I believe this might be the actual root cause of the overhead

For my airflow setup, I am not using PGbouncer, but I am using RDS Proxy for database connection pooling.

I can understand the logic behind using NullPool if there is a database proxy in place, but in this case celery is creating a new engine for every query, which the stackoverflow issue linked advising against, due to the increased overhead.

@Kytha
Copy link
Contributor Author

Kytha commented Sep 4, 2024

so I suspect what you see is a result of some environmental setup that simply "boosts" the hotness of that code artifficially

For additional clarity, here is some relevant airflow config options used during the testing.

AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT=false

AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=false

AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=320

AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE=100

AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP=100

AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY=64

AIRFLOW__CORE__PARALLELISM=2000

AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT=2000

AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_ENABLED=True

AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False

AIRFLOW__SCHEDULER__SCHEDULER_IDLE_SLEEP_TIME=0

AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False

@potiuk
Copy link
Member

potiuk commented Sep 4, 2024

Yeah. I am convinced, but @ashb - maybe you do remember (though it's 4 years agao) why this line was added here #10949 (see the description of the issue - it has a very nice explanation).

@o-nikolas
Copy link
Contributor

@ashb Just another ping post Airflow Summit, do you want to weigh in? Or shall we go ahead and merge this?

@potiuk
Copy link
Member

potiuk commented Oct 2, 2024

Ok. Let me merge it then - there does not seem to be a good reason for it, no answer from those who can remember, let's just merge it.

@potiuk potiuk merged commit f463e2a into apache:main Oct 2, 2024
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 2, 2024

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants