-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Prevent premature MySQL disconnect caused by GC after dispose #56044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent premature MySQL disconnect caused by GC after dispose #56044
Conversation
|
@shahar1 Hello! Can I get the review for this problem? |
I don't mind mentioning me directly in general, but in this case I'd rather have someone else to review it as it is not my area of expertise (yet) :) |
|
@wjddn279, were you able to write any unit-tests to validate this functionality? |
|
Of course. I’ll add unit tests to verify the following behaviors in this code:
I’ll think about a few more cases and leave a mention after writing them. |
8c901db to
24525c8
Compare
|
@jroachgolf84 |
|
|
||
| connect_args = _get_connect_args("sync") | ||
| if SQL_ALCHEMY_CONN.startswith("sqlite"): | ||
| connect_args["check_same_thread"] = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to keep the comment.
|
I am kind of hesitant to make such a change which goes against (or rather not aligining with) the recommendations of sqlalchemy docs. Looking at the docs (and following what @tirkarthi wrote in #56879 (comment) -> maybe a better solution here will be to (also) follow the third option from https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork ?
This will dispose (with default close=True) all the pooled connections before the fork and this will pretty much automatically make sure that there is no garbage-colleciton induced event sending QUIT (because this will happen synchronously in the parent's dispose command). That means that the parent process will have to re-create the connections, yes but that's an acceptable overhead. And we could do it only for This would keep it more aligned with the recommendations from sqlalchemy docs, while targetting the apparent misbehaviour of mysql driver (which BTW - we should likely create an issue for in mysql driver if not done already - because it seems that the buggy behaviour is in the driver itself). It would also avoid the custom behaviours - both |
|
@potiuk I agree with your approach that, specifically for MySQL, disconnecting all pooled connections before forking can ensure connection safety in forked processes. This is a solution that hadn't occurred to me. However, I still have the following concerns: Disposing of the engine means temporarily disconnecting all connections in the pool, which could potentially trigger race conditions that are difficult to predict. I believe that operations with significant side effects, such as connection closing, should be performed at precisely controlled and intentional points (such as explicit connection closure upon program termination or explicit blocking of DB connections in workers) rather than being executed non-deterministically. While this is recommended in the official documentation, I'm not sure if it was designed with highly multi-process environments like Airflow in mind. The official documentation states the following: These four approaches are described as guard logic that should be implemented when using an Engine object as-is in child processes. The approach I've implemented in this PR completely creates a new Engine object in the child process, and by creating a new object, I understood that the situation has changed such that we no longer need to follow the documentation's recommendations. Additionally, by creating a new Session object, we can eliminate the risk factors mentioned below.
As you mentioned, the approach I've adopted doesn't appear to be explicitly mentioned in the official documentation, and I believe your suggested solution would also clearly resolve the issue. It's certainly possible to re-modify the code by adopting the approach you and @tirkarthi suggested. However, I would like to hear more of your thoughts on my reasoning for the solution. |
|
I think we shoudl also understand the context better. We are not talking about "generic" utility but about specific ways Airflow uses forking where forking interacts with SQLAlchemy DB initialization. There are several places where we fork in Airflow (and in Airflow 3 this is quite a bit different than in Airflow 2):
Now assumptions 3), 4) and 5) should be verified, if that's the case for sure (and fixed if not) - which leaves only Scheduler case where both forking and forked process need a database. I think once we verify those assumptions, your concern about
might not be valid - and then the only thing we really need to do is dispose connections before we fork Local Executor processes and re-create the pools. |
|
Just to add I think this will be simplified or solved once DB access from dag-processor also moved to use task-sdk. I also want to add passing I was also checking on other options
|
|
In Airflow 2.11 the orm was configured per dag file processor which is similar to the implementation proposed here referencing issues with multiple process in comment. airflow/airflow/dag_processing/processor.py Line 179 in d9ed7b9
|
|
@potiuk To summarize what you've mentioned (for the benefit of others who may review this later), we can classify the cases as follows:
For workers, dag-processor, and triggerer, cases 1 or 2 clearly apply, and for the api-server, case 1 also clearly applies since it spawns workers. Therefore, only the scheduler when using Local Executor, which falls under case 3, requires verification. However, since all queries executed in the main process (scheduler) are performed synchronously, it's evident that no race condition exists. Ultimately, I understand you're saying that concerns about race conditions in this case are not warranted within the Airflow system. The solution will be to not dispose on fork for MySQL, but instead apply engine.dispose(close=True) for Local Executor. If you had a different intention in mind, please let me know. Thank you. |
Question (just curious) - are you using some AI to generate those responses? They seem very repetitive and seem to echo back what has been written (Which is very much what the AI /LLMS do). It is quite good when human tries to paraphrase things in their own workd - to make sure they understand things - but those kind of repetitions don't seem to add much value - especially if they are automatically generated. Or am I wrong? |
|
@potiuk |
Then cool! Yeah. I agree paraphrasing things by humans is a good idea ! |
Yep. Exactly. Also I think we should verify all the above assumptions - I am not 100% if everything I wrote is correct. There are some things added recently - the internal api server in Dag processor and triggerer, also Airlfow orm initialization might happen in the Dag Processor and triggerer by importing stuff - somewhat accidentally - so I think some more checking should be done to verify if the scenarios i described are 100% correct. |
|
@potiuk Before that, I have a question. Does the DB usage pattern we've classified for each component (whether parent/child processes use DB) reflect a design philosophy? For example, in the dag-processor, child processes don't connect to the DB but instead parse files and pass objects to the parent process. I'd like to know if this is based on a design principle of "dag processor child processes should never directly connect to the DB," or if it's just how it's currently implemented and could change in the future. The reason I'm asking is that our current verification might become invalid later and could potentially lead to new bugs. If this DB usage pattern is not absolute, then the uniform but absolute prevention approaches mentioned by @tirkarthi (NullPool, change driver, recreate engine) could be better long-term solutions. |
Yes - that's the current design philosophy, If it changes - it will go further into isolation - i.e. both Triggerer and Dag processor eventually should not access DB at all - all the communication should go through the api-server. Currently it's a bit balance between performance and security but in the future, complete isolation will become more important. |
|
Trying to use Airflow 3.1.0 these days, we are facing the same issue with MySQL 8 (AWS Aurora 3). How is the progress? |
|
I’m working on addressing the issue with a focus on the long-term direction, so it may take some time before the solution is fully reflected. However, If you know how to run Airflow with some code changes, I can suggest a simple modification to address the issue. |
|
@wjddn279 Sure. I can apply a diff file. Thanks in advance! |
|
The root cause of the problem occurs when a connection object that lost its reference in a forked process gets garbage collected and explicitly quits the connection (only in the case of MySQL). A hacky but simplest fix is to keep the object as a key in a dictionary to explicitly maintain its reference. wjddn279@c522ce1 In my case, it works well. Please let me know if it doesn't resolve the issue for you. |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
|
This PR seems critical for anyone running Airflow 3 on MySQL. But we've have also been tracking systemic The Idea : In Airflow 3, It seems to me that FastAPI server and legacy FAB UI share the same process and thread-local And FastAPI currently lacks a global session teardown (like Flask's Impact of this PR: Recommendation: BUT I can provide an initial PR to help explain |
|
Yes, regarding this issue, we are currently discussing an approach where, in the DAG processor, garbage collection is disabled for objects created before spawning subprocesses, in order to prevent GC from collecting the database connection objects. However, the API server is unlikely to be affected by this issue. As summarized in We will try to move forward with a patch in this direction as quickly as possible. In the meantime, you may refer to and apply the changes I previously suggested as a workaround. #56044 (comment) |
|
@ wjddn279, do you know if there is any final decision on the approach here? We are also facing the same issue, and by applying your PR changes, it gets fixed. The workaround doesn't seem that intuitive to me, so it's better to have a proper fix. |
I am all about merging it once rebasing and making green - we know the root cause, we know that gc and forks do not work well together because a) COW and this race condition - and similar approach was used in local executor and benchmarks shown that it's a good idea to deliberately handle gc on forking (and it follows the recommendations that were posted when gc.freeze() has been implemented in 3.7). So I see no issue in following this one up - as long as it's green, rebased and tested. |
And hearing that from someone who tested it in their own installation makes it even stronger. |
|
Yeap, since this issue is expected to be resolved in a future PR (applying gc.freeze), there won't be any additional work on this PR. |
|
So... let's close it :). And work on the "complete" fix. |
|
@potiuk is the plan to include the |
3.1.6rc1 is out for testing, but if someone provides a fix for it, then it might be included in 3.1.7 for example. |
|
But you can see if 3.1.6rc1 fixes it - there were a number of fixes that could be related, if it still happens there, reporting it here might be helpful |
I guess that's why I am confused because we had this PR:#56044 and from this comment: #56044 (comment) it looks like someone applied that PR to their installation and fixed the problem. |


related: #56879
Solutions
There are three possible approaches:
Among these, (3) is the cleanest and aligns with SQLAlchemy’s official recommendation: create a new engine and session objects when forking processes.
By replacing dispose() with logic to create a new engine in subprocesses, I confirmed that existing connections were no longer finalized and the bug disappeared.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.