Add the ability to backfill a DAG based on past Asset Events #59886
-
|
Hello! Airflow versionAirflow 3.1.4 ContextI'm currently working with DAG scheduling on Asset Events. I saw that if you create a DAG which triggers on an Asset Event, there's no usable way to backfill it based on past Asset Events. For example, if those Asset Events were created when the DAG was not created yet, or if the DAG was paused. I saw two work-around, both with their caveats:
I identified one more usable solution creating a script to automate the second solution. It makes it scalable, but still have the caveat of running all downstream DAGs. QuestionsI'm opening the discussion to ask some questions:
Potential solutionInstinctively, the solution I see to address my need is:
What do you think? |
Beta Was this translation helpful? Give feedback.
Replies: 9 comments 3 replies
-
|
This seems a very specific need and I think you should just use a Python script - and use Python Client to trigger such assets in the way you want via the API. The thing about backfills is that their whole "management" is based on data intervals - and it's easy to specify "between this and that interval" for example. I do not see an easy UX where you would like to "declaratively" define what assets to regenerate - it's way easier and way more flexible to write a simple Python script to generate the set of API calls that you want to "backfill". |
Beta Was this translation helpful? Give feedback.
-
Well - what you would use as selection criteria? Time (which we know it cannot be used because it's not time-triggered)? Or what would be your criteria there?
Same as current time-range backfill. If your time-range Dag is backfilled, it should (I believe) also generate asset events if they produce assets. Unless you are talking about "asset-downstream Dags, not the Dag to backfill -downstream Dags".
I am not sure - but if you do not want to reprocess "everything" that asset generates possibly what you want to do is "clear" all the tasks in specific the DagRuns rather than backfill them. I guess you want to have something that will allow you to clear (and effectively re-run) all the tasks for a set of dag_runs. In which case you could simply clear all the taks in the DagRun - which is what https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/clear_dag_run does - and then you go back to the "how do I select the dag_runs to clear", which I (again) think is way better to have script to do so - rather than come with a "backfill UI" modification - because you would have to somehow would have to specify "backfill those particular dagruns" - with some selection criteria |
Beta Was this translation helpful? Give feedback.
-
Backfill does not use "when asset or dag run were created" - backfill works on "data intervals". For backfill, It does not matter when your dag was triggered, it is important what "data interval range" it covers (and this is derived from schedule - each scheduled dag has "interval-start" and "interval end" - and this is automatically derived from schedule, and those "interval ranges" are non-overlapping. I.e. if you select backfil for last 28 days covering 4 weekly dagruns - those 4 dagruns will be backfilled - i.e. "whole month". So when you select backfill dates you do not select "when the dagruns were triggered" but "what data you are backfilling". For example - if I backfil May 2025 today, - all the dagruns that concern "May 2025" data interval will be back-filled. Which means that if there were dag runs in May 2025, they will be effectively replacung the runs from "May 2025", but they will be triggered in December 2025. If I backfill the same May 2025 data a year from now (December 2026), then the backfill dagruns triggered in DECEMBER 2025 will be replaced with the new dag runs - even if they were TRIGGERED in December 2025 - because those dagruns refer to "data intervals" of May 2025. In your case you have no relation between "data intervals" and "when your asset was triggered". Asset does not have a concept of "data interval" at all. Asset triggering a run is triggering a run, but this run is not associated with any data interval. This is very succintly described in the backfill documentation: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/backfill.html
Logically what you are asking for is ability to "clear" certain, selected dag runs. In your case you think that "when the dag was triggered by an asset" is the "trigger date" criteria that you want to use. But - this is not a back-fill, because backfill operates on "data intervals" not "triggering dates". So what you really want is to:
This is not backfill - at least not as we define the word "backill" - it's simply "clearing selected dagruns". And they way how backfill is implemented when it operates on adjacent, non-overlapping data intervals and not "trigger dates" - repurposing backfill to do what you want makes very little sense and is terribly confusing for those who understand backfill as "back-filling data intervals". So .. if you want to do it now you have two options:
|
Beta Was this translation helpful? Give feedback.
-
You are contradicting yourself here. In this case you just start a New dag. It does not "catch up" with anything. Because it will be created NOW not in the past. Again - you have no linking of the asset-triggered dag with "time". This is where your logic is completely broken. If you want to "catch up" with say "5 missing past events" - those will be just 5 times NEW asset events that will create new dag runs. Those dag runs will have NO "past" meaning at all. They will be triggered "now" - and there is no way you can "catch-up" with anything. You will have simply 5 new runs of the same dag. There is no "concept" of time or time interval associated with dag run triggered by asset. Those new 5 dag runs will not be created in the past (also backfill does not do it - because it makes no sense whatsoever - when you backfill (timed) dags - each of them will be associated with some past "data interval", but they will be triggered NOW. So you are not "catching up" with anything - you are just creating 5 new dags. In order to somehow associate your dags with some "time" to catch up, you would need to specify that time somehow as "extra" of your asset - in the way that is specific to your asset, because - as I already explained several times - asset-triggered dag has "NO" time relation. It's just a dag run triggered by asset - but it tells nothing about "Time" of that event, except the time when it was triggered - but tha time changes, if you want to run the dag run again, it will be triggered "now" and it's triggering time will be NOW - not in May. You can "clear" past run, yes, but you cannot create a past-time asset triggered dag run unless you specify time in asset-specific way. And it makes absolutely no sense to catch-up old dags if you do not not want to trigger downstream dags. If you want to trigger new Dag Runs that have no downstream dags, you should define them like that and run. I see zero reason for
I have no idea what that event would do in term of the "catchup" described above. There are no past dagruns so whatever queud events relates to past dags - especially - that (as I explained in detail several times) there are no asset triggered dag runs connected with some time - they have "create time" and if you create a new dag run - they are always created NOW. |
Beta Was this translation helpful? Give feedback.
-
|
Probably what you want:
This all is easiest done with a script - because you need some things that your dag will handle by convention:
This is - if I understand correctly - because for some strange reason you do not want to use timetables and scheduling and data intervals (which is more appropriate for such "timed" dag runs - why you want to do it is a mistery, bycause it seems like custom emulating of the "data interval" feature that "scheduled" dags have, but that looks all like some strange custom approach that you want to use without understanding that scheduled dags are best run by ... schedule - not assets. That seems like using screwdriver to put the nail in, and if you want to do it on your own - feel free, but the feature of scheduled dags is in airlfow long before assets, and emulating it with assets, when it is fully featured, scheduled based implementation makes completely no sense for me. |
Beta Was this translation helpful? Give feedback.
-
Let's not limit to the semantic. From all the beginning of your answer, it seems neither "backfill" nor "catch-up" is the right terminology.
I think you are mis-understanding here : I don't want to "catch-up old dags" without "trigger downstream dags". Take back my example with The issue now is that those DAGs are not necessarily created at the same time. When creating Does it still make absolutely no sense to trigger independently Indeed, if there were 5 asset events in the past, which means my data had 5 updates in the past, I want to trigger 5 new DAG runs, NOW, to process the data with my How would you call this? Is it such an exotic or wrong usecase?
To reformulate here: I want to create NOW an asset triggered dag run, giving it as input a past asset event.
Let say a run of At this moment, if However, if DAG_2 is paused or not created at this moment, then it won't have anything queued. To circumvent it, manually calling Ultimately, this endpoint would allow DAG_2 to process the data it wasn't able to process because it was paused, without having to touch DAG_1.
From your second message, I really think you are mis-understanding my usecase. Probably because I used the terminologies "backfill" or "catch-up" which confused you. I hope the beginning of my message will give you a better understanding. |
Beta Was this translation helpful? Give feedback.
-
Yes, you mix it up completely. Because backfill and catch-up strictly refer to "time intervals". And those only make sense for scheduled dags.
There is no new APIs needed for that. You can just trigger asset event manually. https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/create_asset_event -> if Dag2 and Dag3 are unpaused and you want to trigger it "as if the dag1 did it", you just have to create asset event - exactly the same as Dag 1 would do. Dag 1 - when completing simply creates the asset event that is it's output. You can do the same via API. You just needed to know what asset to create, which extras it should have. Once you create it Dag 2 And Dag 3 will be triggered. You just need to know what assets and with what assets and with parameter you should trigger. And - as I repeated several times - you just need to (best) write a script that would find out (according to your criteria) which asset events and with which parameters you should trigger. This is what you can do now and what I think makes sense. However if you want to turn it into something with UI and being able to say "re-publish all the events, using some criteria from the past and trigger all the Dags that have not been triggered by those events (which I understand from description) - then you are welcome to propose Airflow Improvement Proposal - as any othee new big features. But I am afraid you have vastly oversimplified view on how assets and triggering and events work in Airflow. You limited your case to a single dag producing single event that triggers another Dag. This is but a fraction what Airflow assets and events do. Currently, events in Airflow do not have "state" -> they get triggerred, they trigger what is currently subscribed to according to their condition - which might be arbitrary complex. In many cases those events wil be simply archived very quickly - in big systems there might be 1000s of events per minute - so currently, when event is done,any conditions that are using it get re-evaluated and that's it. But you seem to want to persitently store the state of the event over history - basically tracking all the events triggered and evaluating triggering conditions. You likely completely missed this chapter: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/asset-scheduling.html#advanced-asset-scheduling-with-conditional-expressions . Currently the whole design of assets is based on the notion that Dag simply "waits" for the triggering condition (which might be arbitrary complex and contains logical operations), it accumulates incoming events are they are coming and calculates the conditions based on those events and "fires" the downstream dag when the conditions are met. The only state that is stored is the "current input events state for dag" - and it's just "current". We do not evaluate or keep history of it, that conditions are evaluated looking at what happened since the last time the Dag was run. For example if the same event is generated several times but the downstream asset waits for other condition to be received (and with some boolean logic applied) - the waiting dag basically "discards" those duplicated events - treating them as "single" event. So if you want to add a new Dag with some conditions, you would not only have to find past matching events - you would have to basically reply all the history for those events, track the status of the logical conditions, how they changed over time and determine how many times you need to trigger the new Dag - basically simulating all the events that were triggered in the past. Possibly, you could use QueuedEvents, but this is not a question of "tweaking an API" - but designing and implementing huge new feature, considering data usage, consequences, storage mechanisms and persistency of the data as well as performance impact of such re-valuations. It's not a backfiil, nor catch-up (those are simple because they act on data intervals and not events and conditions that can be arbitrary complex). And it certainly won't use backfill UI - because the implementation and mechanism of backfill won't be usable for it - pretty much at all. So whan you prepare your proposal it might have simillar UI - depending on kind of criteria you want to apply to past events. But this is a completely new feature - so similarly to what Backfill proposal did https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-78+Scheduler-managed+backfill - such document will have to be created, discussed on the devlist, if there is feedback, the feedback should be applied by you and then final proposal should be voted on the devlist - same as Scheduler Managed Backfills were. If you get a successfull vote, you might proceed to implementing it. But my suggestion - if you decide to use it - please, please., please., stop confusing it with Backfill or catch-up. Come up with another name. Backfill is something that has a very concrete meaning in Airflow and it's roots are in data interval -> so everyone will be completely confused if you keep naming it "backfill". What you really want to do, is an interface to track history of asset events and allow to replay the history of events, re-evaluating the conditions to trigger downstream dags and re-trigger them accroding to history of what happened. There is somehow related discussion https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-93+Asset+Watermarks+and+State+Variables -> asset watermarks - which also talk about some event state that is likely related. The thing here is that if you want to do something like that you also have to store some state about the events - a bit different but related. So likely discussion about those new proposals should be somehow coordinated. |
Beta Was this translation helpful? Give feedback.
-
That's basically what I described in my very first message, with the automation I mentioned. Actually, the caveat I mentionned can also be worked-around: to trigger DAG_2 or DAG_3 independently and avoid potentially unecessary processes, the script must also pause all DAGs that doesn't need to be triggered, and un-pause them at the end. Given the rest of your answer, having a more integrated solution, either via an API to put an existing event in the DAG events queue, or an UI to facilitate it, would raise many more concerns especially for use cases with complexe triggering conditions. Thanks for all you anwers. I'll go for the script, and I'm satisfied meanwhile to have shared the need and assessed that I couldn't easily contribute to Airflow to address it in a more integrated way. Maybe last question for you: is my usecase so exotic? I'm surprised that the need of processing data updates that happened when a DAG was not created yet is not something common. If it's exotic, then it might hide a mis-usage of Airflow on my side? |
Beta Was this translation helpful? Give feedback.
-
We have no idea - you are the first one to ask this question - and to be honest It does not matter how "exotic" it is. What matters is whether someone (for example you) would willl to make it into a product feature, and whether the community will decide to take a maintenance burden for it if contributed. So I do not know about "exoticness" - I have no data to judge it for now - but I am sure that if you want to modify your Dag inter-dependencies in the past and reprocess events from the past in general case, this is rather complex "feature" if you want to include our event model and conditional processing. And it means that it will cost a LOT to develop it as a feature and it will be costly in maintenance. But - it's way simpler if you consider a simplified case like yours wher you want to modify your complete dependency set (by adding new diagrams) and "pretend" they were always there, and when you can "simplify it" because you have a small subset of our event feature use. And developing such script that will handle your simplified case using the APIs we have is not only possible, but also relatively easy (but only if you limit it to your specific case - where you limited scope of it heavily - because it's "your case"). There is a big difference vs "one-time solution", "reusable solution" and "product feature" and there are rule of thumb calculations there:
And this is "rule of thumb", and 9X is very conservative for many cases. It only works for really simple cases. Also when you make into a product, there is cost of maintainig this solution, running and fixing tests continuously, fixing bugs and also the impact it has on developing new features and refactorings of the product (Apache Airflow) that it interacts with. So -> as I suggested from very beginning -> having a one-time-solution done by you, is a cheap and easy to test by you and seems like a best option for you. Turning that into a "Product feature", you will have to spend a LOT more effort - you can think of spending order of magnitude more time on it. Which you of course might want to if YOU are convinced it's not an exotic case. We very much welcome proposing new AIPs - even if they are not going to make it finally, there is always something to learn from those. And our AIP (Airflow Improvement Proposal) process is actually designed to answer many of those questions. When you make an AIP proposal in devlist you wil find out:
And I am not one to make decisions or judgments there - that's not my role as a maintainer. My role is to respond here, try to understand what you are asking for and point out to things that are important. There is the whole community at So .. I have no idea if your use case is "exotic" - I am not able to answer you that question, but you can likely find out by proposing AIP and discussing it at the devlist. |
Beta Was this translation helpful? Give feedback.
Yes, you mix it up completely. Because backfill and catch-up strictly refer to "time intervals". And those only make sense for scheduled dags.
There is no new APIs needed for that. You can just trigger asset event manually. https://airflow.apache.org/docs/apache-airflow/stable/stable-r…