Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cache to Variable and Connection when called at dag parsing time #30259

Merged
merged 29 commits into from
Aug 10, 2023

Conversation

vandonr-amz
Copy link
Contributor

@vandonr-amz vandonr-amz commented Mar 23, 2023

A lot of users setup Variable.get calls for a bunch of configuration options in their DAGs with reasonable defaults, just to be able to set them dynamically if needed.
Because those variables are not set, we have to look in the custom backend (usually an http request), the env variables (that particular step is not a pb), and the Metastore (DB call) each time.
Variables are often gotten at the top of the dag files, so those calls are made for every DAG parsing pass, slowing down the whole process.

Furthermore, accessing remote secret managers is rarely free, and having a call made for each variable every time a dag file is parsed can yield high (and unexpected) API charges for cloud computing users.
AWS secret manager: $0.05 per 10,000 API calls
GCP secret manager: $0.03 per 10,000 operations
Azure key vault: $0.03/10,000 transactions

An alternative could be to setup a cache in those specific secret backends, but then it multiplies the work required. And since the custom backend is always the first thing that's been looked at, the behavior would be the same.

I'm proposing a component-level cache, disabled by default (with a default TTL of 15 minutes), which seems like a good compromise between reducing the number of calls and still keeping some reactivity to changes.
It's configured to be active only when parsing dags (i.e. in the scehduler(s) or dag processor).
This would be marked as an experimental feature, given the potential side effects that may break some setups.

Since most of the Variable getting is happening in isolated processes during DAG parsing, I'm using a Manager to have a synchronized cache, handled under the hood in its own process.
I tried to keep the impact on the codebase as low as possible. It only requires an init call before the DAG parsing processes are forked, to make sure the cache has been initialized in the parent process, and will be accessible from the children via the copied memory.

For now, there is no bound to the cache size, and no cleaning. If it is estimated that this can be a problem, we can setup a cleaning step, for instance just before DAG parsing, where I inserted the init() call.

@boring-cyborg boring-cyborg bot added area:Scheduler including HA (high availability) scheduler area:secrets labels Mar 23, 2023
@vandonr-amz
Copy link
Contributor Author

creating as draft while I fix eventual CI problems

@Taragolis
Copy link
Contributor

AWS secret manager: $0.05 per 10,000 API calls
GCP secret manager: $0.03 per 10,000 operations
Azure key vault: $0.03/10,000 transactions

My evil twin said: "The users should pay for not to follow Best Practices"

Does this cache optional and default to false? Is it affect templated access to variable or inside of python callable?

@potiuk
Copy link
Member

potiuk commented Mar 23, 2023

I think this is not going to work. One reason is that would change Airflow's behaviour in unexpected ways and another is that the caching you implement (in-memory of the process) is not actually working because of our process fork mode.

Many people are using variables for exchanging the data between tasks and they are using variables as simple xcom replacement. Adding cache changes a lot the promise we made about the variables.

Using variables (as explained is costly) at the top of the Top Level Code is not recommended (regardless whether it is cache or not) - because it involves DB operations.

There is however much bigger problem - I have failed into the same trap once and since then I already pointed out in several discussions which were related.

Storing the values in an in-memory cache does NOT solve the probem at all. You might think it does, but in fact, the cache you use in-memory is only used inside the process that keeps the memory and other processes cannot access the in-memory cache. We use forking and multi-processing A LOT. Like REALLY a LOT:

  • every single dag parsing process is a new process forked to parse that particular file
  • every worker that executes the tasks is a new process forked to process the task

This is done to maintain isolation between parses and tasks (so for example if parsing process crashes, or task crashes it only crashes the fork, not the DagFileProcessor).

The problem is that if the cache is not filled before the fork happens the values in the cache are only used for that fork. Filling in the cache after the fork will not fill in for the other processes. You would have to pre-emptiively read all the variables to the cache in order to re-use them in forks (in the parent process)- but you would have to know which variables to read from parsing the files (so there is a chicken-egg problem).

The effect is that basicaly everywhere where you would like to gain some benefits by adding the cache - you gain nothing. If you have variable at the top-level of a Python file, it will be parsed and retrieved precisely ONCE during parsing of that file, and the cached value will be cleared when this particual parsing finishes. Parsing the same file again (or another file) starts from a clean cache because the parsers are forked from DagFileProcessor which has an empty cache.

The same is for workers - every time a task is run, new fork is created. And cache is empty. So if you have variable at the top of the DAG and you run 10 tasks from the same DAG - even in the same worker - the cache is empty for every task and you anyhow retrieve the same value 10 times - filling 10 different caches.

The only POSSIBLE solution to that is to use external cache (REDIS/MEMCACHE) to store the values and read them from there. But that is a LOT of hassle, it also changes the promise of Airflow to have "synchronous" changes to the variables. Operationally that would also be additional component to manage and extra complexity. Likely this is not something we really want to implement as part of Airlfow. For secrets we have already a good (I think) solution with lookup patterns: #29580 and also local caching agents might be good solution (Vault has one for one). Also - simply using Variables at top-level of your DAG is simply a very, very bad idea.

Sorry to be a bit brutal here, but unless some of the things above can be challenged (I would love to get it challenged and dicussed and maybe there is a solution I cannot see) - I am afraid the change is not doing what it intended to do.

potiuk
potiuk previously requested changes Mar 23, 2023
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting changes so that it is not merged accidentally, but I think it's not gonna fly (see detailed comment for explanation).

@potiuk
Copy link
Member

potiuk commented Mar 23, 2023

BTW. This is very nice illustration of the famous quote

There are only two hard things in Computer Science: cache invalidation, naming things, and off-by-one mistakes.

@potiuk
Copy link
Member

potiuk commented Mar 23, 2023

My evil twin said: "The users should pay for not to follow Best Practices"

My evil twin says - we won't save such user, even if we wanted.

@potiuk
Copy link
Member

potiuk commented Mar 23, 2023

BTW. Update. Yes. I see you are using Mutliprocessing manager. But I believe it will not really work well when you have forked processes done not as part of the multiprocessing which is done in DagFileProcessor an Celery processes.

I believe you cannot just have multiprocessing established after the forking happened (but I would love to be proven wrong BTW)

@vandonr-amz
Copy link
Contributor Author

Yes. I am using Mutliprocessing manager.

It works well when I test it in breeze, but indeed, that's a specific setup, and I'd need to test this in a more prod-like environment, where maybe multiprocessing could show its limits.

About the users not following best practices, I think it's an easy stance to take, but not very helpful. We want users to fall in the "pit of success". If we know that they are doing things that are not recommended, we should either make those things work well anyway, or make it a lot harder to do the not recommended stuff.

Also, about what you say above, users using variables as XCOM replacement (skipping over the fact that this is probably not a recommended practice as well ?), that would still work because I invalidate the cache on Variable.set, so the changes are propagated instantly for those.

@vandonr-amz
Copy link
Contributor Author

vandonr-amz commented Mar 23, 2023

just putting here for the sake of it a little test I did locally, with 100 relatively simple dag files, each have 8 Variable calls (maybe you're going to tell me that's excessive and shouldn't be done, but hey, I didn't write those dags). I ran with and without this change, and logged the dag processing time. Having a cache improves the processing time a lot (the first parsing is always slower, I don't know exactly why in the "main" case, I excluded it from the min/max/avg)
image
Y axis is in seconds

@potiuk
Copy link
Member

potiuk commented Mar 23, 2023

Yes. I am using Mutliprocessing manager.

It works well when I test it in breeze, but indeed, that's a specific setup, and I'd need to test this in a more prod-like environment, where maybe multiprocessing could show its limits.

Yeah - I would love to see (actually) if we can make it works at least to some extent. I do see the problem and I understnad the need to solve it (taming user's wild ideas is not easy, I am aware of it). Especially in Celery (where we have limited impact on what's going on - also you should be aware that Celery uses billiard as direct multiprocessing replacement. This is a forked multiprocessing library that has a number of under-the-hoods things "corrected" and there were multiple reports of our users where their multoprocessing code crashed in the Celery-run tasks (there are 33 issues with billiard in our issues https://github.com/apache/airflow/issues?q=is%3Aissue+billiard+is%3Aclosed )

About the users not following best practices, I think it's an easy stance to take, but not very helpful. We want users to fall in the "pit of success". If we know that they are doing things that are not recommended, we should either make those things work well anyway, or make it a lot harder to do the not recommended stuff.

I was actually thinking of completely blocking using DB calls in dag parsing. That would be more helpful. I think caching and TTL might have very unexpected behaviours and hit our users much more than slow parsing, so if we do it, this has to be done extremely carefully. I can see multiple scenarios where things might go wrong.

Also, about what you say above, users using variables as XCOM replacement (skipping over the fact that this is probably not a recommended practice as well ?), that would still work because I invalidate the cache on Variable.set, so the changes are propagated instantly for those.

Unfortunately not. They will only work if you have single machine setup working and all processes somehow sharing the same memory. It stops working when two tasks are run on two different machines, because (unlike the db, non-cached variable) the second machine will see the new change only after the TTL passes. (unless of course you propagat the "set" state to all the machines - but that starts becoming pretty complex).

@potiuk
Copy link
Member

potiuk commented Mar 23, 2023

In any ways - this whole idea (even if proven working in all the complex scenarios ) - because of it's TTL behaviours will have to be brought to devlist for discussion, because it has a potential of changing some basic behaviours and promises oof Airlfow.

@vandonr-amz
Copy link
Contributor Author

ok, yeah, happy to discuss it in the mailing list.

multiprocessing.Manager() also supports remote calls over the network, but then it's kind of moot because we replace a network call with an other.
Actually if we could have some sort of caching work only at dag parsing time, I think that'd already be pretty helpful for the many users who don't follow the best practices.

@potiuk
Copy link
Member

potiuk commented Mar 23, 2023

multiprocessing.Manager() also supports remote calls over the network, but then it's kind of moot because we replace a network call with an other. Actually if we could have some sort of caching work only at dag parsing time, I think that'd already be pretty helpful for the many users who don't follow the best practices.

Possibly - but it very much depends on the DAGs they have. If you have big dag with many tasks that are small-ish, the whole DAG is parsed every time the task is being executed (actually in certain situations it happens twice- for example when there is impersonation involved). Once/Twice for every single task execution. If you have dynamically mapped tasks - it happens every time for every instance of mapped task.

So the advice 'don't do anything heavy in DAG top-level code` IS actually a helpful (and serious) advice.

BTW. When it comes to savings on secrets, the Lookup patterns by @vincbeck are a very good solution #29580

@vandonr-amz
Copy link
Contributor Author

btw, celery doc mentions the Managers... without explaining in very clear terms whether they are supported or not, but I guess that the fact that they mention it means it's supported ?
https://github.com/celery/billiard/blob/main/Doc/library/multiprocessing.rst#sharing-state-between-processes
They only talk about not supporting semaphores.
But as said before, if it's to make a network call too, not sure it's gonna be a big improvement.

@potiuk
Copy link
Member

potiuk commented Mar 23, 2023

Yeah. I really did not want to discourage it - if we can improve the situation, that would be win-win for everyone (though it has a little impact on slower "education" of users who might realise top-level code is bad later.

I am really seriously thinking that the right solution is not to "mitigate the problem" but to "prominently make users aware that they are not doing the right thing". Trying to to think out-of-the-box - maybe a good solution will be to display a warning in the UI (you are using Variables at the top-level and this is a bad idea). I am afraid we are trying to apply a band-aid which might at most delay the probem and the right thing is to influence (and clearly flag problems) changing the behaviour of the users rather than trying to catch-up with the bad behaviours.

@eladkal
Copy link
Contributor

eladkal commented Mar 23, 2023

About the users not following best practices, I think it's an easy stance to take, but not very helpful. We want users to fall in the "pit of success".

I think this is a hursh statement.

Users who fall in this are users who do not know Airflow and should spend some time learning it before authoring dags. I myself mentored and on-boarded dozens of developers to Airflow. This one was in lesson number one.
If we have a way to avoid pitfalls without complecating then by all means lets do it but if not then I suggest to avoid it.

May I also suggest that the pitfall we discuss here is a side effect of a larger issue called: who is the Airflow user and what skills he expected to have?

@uranusjr
Copy link
Member

I don’t think this is going to work in a backward compatible way. One possibility toward this would be to introduce some sort of custom variable mechanism like what we have with XCom. Or alternatively, you can always build you own wrapper on top of Variable that does caching and use that instead. Changing how Variable works like this is not viable.

@vandonr-amz vandonr-amz changed the title Add a cache to Variable.get Add a cache to Variable.get when called at dag parsing time Mar 29, 2023
@@ -82,8 +82,7 @@ def get_variable(cls, key: str) -> str | None:

@classmethod
def save_variable(cls, key: str, value: str | None):
"""saves the value for that key in the cache, if enabled"""
cls.init() # ensure initialization has been done
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not doing the init automatically when a value is saved means we control when the cache is enabled.
In this case, it's only called in https://github.com/apache/airflow/pull/30259/files#diff-827b7a469438ffd1b172ae295134a84c8a914c9ea4c9ea2a7d3de1f1d5aa6bb6R1071
so it's only active when parsing dags.

@vandonr-amz vandonr-amz marked this pull request as ready for review April 5, 2023 00:09
@ephraimbuddy ephraimbuddy added this to the Airflow 2.7.1 milestone Aug 3, 2023
@potiuk
Copy link
Member

potiuk commented Aug 6, 2023

cc: @jedcunningham ?

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! LGTM

of longer propagation time for changes.
Please note that this cache concerns only the DAG parsing step. There is no caching in place when DAG
tasks are run.
version_added: 2.6.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version_added: 2.6.0
version_added: 2.8.0

valid. Entries are refreshed if they are older than this many seconds.
It means that when the cache is enabled, this is the maximum amount of time you need to wait to see a
Variable change take effect.
version_added: 2.6.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version_added: 2.6.0
version_added: 2.8.0

@hussein-awala hussein-awala added the type:new-feature Changelog: New Features label Aug 7, 2023
@hussein-awala
Copy link
Member

@ephraimbuddy I saw that you added it to milestone 2.7.1, is it ok to add an experimental feature to a patch version or it was added by mistake?

@potiuk
Copy link
Member

potiuk commented Aug 9, 2023

@jedcunningham @ephraimbuddy -> how about we take a deep dive and merge this one for 2.7.0?

@ephraimbuddy
Copy link
Contributor

@ephraimbuddy I saw that you added it to milestone 2.7.1, is it ok to add an experimental feature to a patch version or it was added by mistake?

I moved everything not merged yet to 2.7.1

@ephraimbuddy
Copy link
Contributor

@jedcunningham @ephraimbuddy -> how about we take a deep dive and merge this one for 2.7.0?

This seems late but this has really stayed long and leaving it for another 2/3 months is looong.

@potiuk
Copy link
Member

potiuk commented Aug 10, 2023

This seems late but this has really stayed long and leaving it for another 2/3 months is looong.

Quite agree.

of longer propagation time for changes.
Please note that this cache concerns only the DAG parsing step. There is no caching in place when DAG
tasks are run.
version_added: 2.6.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version_added: 2.6.0
version_added: 2.7.0

valid. Entries are refreshed if they are older than this many seconds.
It means that when the cache is enabled, this is the maximum amount of time you need to wait to see a
Variable change take effect.
version_added: 2.6.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version_added: 2.6.0
version_added: 2.7.0

@potiuk potiuk dismissed jedcunningham’s stale review August 10, 2023 13:03

As discussed with Ephraim, seems that the comments have been addressed.

@potiuk potiuk merged commit dc8ad0c into apache:main Aug 10, 2023
potiuk added a commit to potiuk/airflow that referenced this pull request Aug 10, 2023
ephraimbuddy pushed a commit that referenced this pull request Aug 10, 2023
ephraimbuddy pushed a commit that referenced this pull request Aug 10, 2023
ephraimbuddy pushed a commit that referenced this pull request Aug 10, 2023
Follow up after #30259

(cherry picked from commit 90fb482)
@vandonr-amz vandonr-amz deleted the vandonr/wip branch August 10, 2023 16:00
ferruzzi pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 17, 2023
ferruzzi pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler area:secrets pinned Protect from Stalebot auto closing type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.