Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic DAG Params behaves differently in manually triggered run vs scheduled run. #39904

Closed
2 tasks done
jianqiuhuang opened this issue May 28, 2024 · 3 comments
Closed
2 tasks done
Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet pending-response

Comments

@jianqiuhuang
Copy link

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.2

What happened?

Dynamic DAG Params behaves differently in manually triggered run vs scheduled run.

  • For manually triggered run, the UI calculates the DAG conf Parameters and populate the values in the Trigger DAG page. The values are then fixed in the DAG run.
  • For scheduled run, Airflow re-evaluates Params for each task run. This is unexpected and different from manually triggered run behavior

What you think should happen instead?

Scheduled run should behave the same as manually triggered run. i.e. the value of DAG level Params should only be evaluated once.

How to reproduce

  1. Create a simple DAG that contains a DAG-level Param using `pendulum.now("America/Los_Angeles")
import pendulum
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator


def print_param(date, **context):
    print(date)


with DAG(
    dag_id="test_param_behavior",
    start_date=pendulum.datetime(2024, 4, 16, tz="America/Los_Angeles"),
    schedule='0 * * * *',
    catchup=False,
    params={
        "date": Param(
            default=pendulum.now("America/Los_Angeles").strftime("%Y-%m-%d, %H:%M:%S")
        )
    },
) as dag:
    PythonOperator(
        task_id="print_date",
        python_callable=print_param,
        op_kwargs={"date": "{{params.date}}"},
    )

  1. Scheduled run behavior - The above DAG runs every hour. Once a schedule run is completed, examine the log in print_date. Clear the task and examine the log in attempt Bring in more resolution to hivestats #2, the timestamp is now a different value
  2. Manually triggered behavior - Manually trigger the DAG. Once the run is completed, clear print_date task and examine the log in attempt Bring in more resolution to hivestats #2, the timestamp should be identical to the timestamp in attempt Improving the search functionality in the graph view #1

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jianqiuhuang jianqiuhuang added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 28, 2024
@jscheffl
Copy link
Contributor

jscheffl commented May 29, 2024

I believe the difference is mainly that you use a non-constant parameter default. If no configuration is given at trigger point of a DAG run (which is the case of scheduled runs) then the defaults are applied. In case of a triggered run the config dict is used and over-rides the defaults.

If you modify the config dict at point of trigger and remove values then also the default will be used, same like if you trigger on API and do not pass a conf.

Main issue I see is if you have non-constant defaults in parameters that defaults are changing based on time of evaluation. At the moment this is a conceptual thing and I would not rate as this being a bug.

If you need to have constant parameters throughout the run of a DAG and your default parameter values are "volatile" then I propose that you capture the params initially in a python task ans return them as response. Then this is persisted as XCom and you could use the XCom from the first task to make downstream logic constant. Otherwise it would be good to step back from using volatile defaults. If you need a date, then maybe better try using logical date from the DAG run or leave the field w/o a default and calculate the required volatile input based on other constant facts.

@jianqiuhuang
Copy link
Author

Thanks for the feedback. We will be implementing a task to persist the values in XCom as a workaround as it's suggested by you as well.

For scheduled run, instead of re-evaluating the default value of DAG params, wouldn't it make it more consistent if it evaluates the params (to constant values) and proceed with task runs?

@jscheffl
Copy link
Contributor

Yes, there was a bit of discussion in the community about changing the (previous, you could rate it "legacy") conf that is merged with the later introduced "params" but no decision was made. It might be something that could be made cleaner in an Airflow 3 I assume.

@hussein-awala made an attempt in a PR (see #29174) to clean this up... but somehow it never made it.

In general persisting the params at point of start sounds reasonable but might be treated as a breaking change, I assume a lot of users on the contrary rely on an option to dynamically evaluate. If this behavior is to be changed, at least there would be an option needed to change to previous logic for backwards compatibility.

I'll put the request as a discussion item into a future Airflow 3.0 list.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet pending-response
Projects
None yet
Development

No branches or pull requests

2 participants