Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP/Idea: Pass task output as outlet to dataset trigger params #37888

Conversation

jscheffl
Copy link
Contributor

@jscheffl jscheffl commented Mar 4, 2024

This PR is a WIP proposal to fix/resolve the request for feature #37810

NOTE: It is just a code preview, therefore WIP.

Idea:

  • Use XCom / task result as extra (if not provided in Dataset reference)
    • If dataset defines extra use this as params
    • If task returns a dict, pass this as params
    • If task does not return a dict, provide the task name as key, return as value
  • Pass the extra to the data triggered DAG as params
    • If multiple events trigger a DAG, params are merged

Open items:

  • Agree on "this is a good idea"
  • Documentation
    • Is this a noteworthy change? Then add a newsfragment
  • Polishing of example(s)

closes: #37810

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Mar 4, 2024
@jscheffl jscheffl marked this pull request as draft March 4, 2024 20:53
@jscheffl
Copy link
Contributor Author

jscheffl commented Mar 4, 2024

FYI @jedcunningham @uranusjr WDYT? (I thought it is more complex but walking through the code looks quite simple...)

@jscheffl jscheffl changed the title Pass task output as outlet to dataset trigger params WIP/Idea: Pass task output as outlet to dataset trigger params Mar 4, 2024
Comment on lines +1287 to +1291
run_conf = {}
for item in dataset_events:
event: DatasetEvent = item
extra: dict | None = event.extra
if extra:
run_conf.update(extra)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a bit too heavy-handed, but I like the idea of passing in event extras as the downstream DAG run parameters. (Should we use conf or params for this?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was just an idea - not finally thought through. If you say "heavy handed"... what do you mean? too brute force and the user does not know what comes out? Or do you mean we need a better merging mechanism? Or some hooking to be able to inject a custom merging strategy? Or just "code is ugly" :-D

Background: I would assume that in 90% of cases a single DAG triggers a dataset. THeremight be cases where multiple events come together to trigger. In such case we need to merge extras. I'd assume most times it is "conflict free" but you never know. Might be a feature to have it "last property wins" to collect events but otherwise if users feel there are too many conflicts, individual extras can also be produced "conflict free" with individual keys.

conf vs. params:

Yes, params and dag_run.conf somehow should be merged. I believe this is a leftover in the API from the past. CONF is the dict which is used to trigger a DAG. The conf is persisted as blob with the DagRun.
During runtime the conf is available in the context as dict, representing 1:1 the conf used to trigger. No validation. Just a dict.
params in contrast have default values, conf is setting values on top and the result is JSON validated.
Both üaramsand confare available in the context and can be used. I believre mid-term we should deprecate the usage of conf in the DAG and consolidate to the (more and better functional) params. But for today params only exist during runtime.
@hussein-awala did an attempt here but it dd not make it to finish line: #29174

@jscheffl jscheffl force-pushed the feature/37810-use-xcom-output-as-extra-for-dataset-trigger branch from cb3514a to f00f806 Compare March 10, 2024 21:36
@boschglobal boschglobal closed this by deleting the head repository Mar 21, 2024
@jscheffl
Copy link
Contributor Author

My organization messed-up the airflow repo Fork - data is gone - will need to re-open the PR later when recovered :-(

@jscheffl
Copy link
Contributor Author

Repo at Bosch was restored, re-opening discussion :-D

@jscheffl jscheffl reopened this Mar 22, 2024
@potiuk
Copy link
Member

potiuk commented Mar 22, 2024

Repo at Bosch was restored, re-opening discussion :-D

Ufff

@jscheffl
Copy link
Contributor Author

Other PR super-seeds this.

@jscheffl jscheffl closed this Mar 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Annotate a Dataset Event in the Source Task
4 participants