diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 5d1fe9ba8e51e..b818ea08d5714 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -325,6 +325,11 @@ def string_lower_type(val): type=positive_int(allow_zero=False), help="Max active runs for this backfill.", ) +ARG_BACKFILL_DRY_RUN = Arg( + ("--dry-run",), + help="Perform a dry run", + action="store_true", +) # misc @@ -1030,6 +1035,7 @@ class GroupCommand(NamedTuple): ARG_DAG_RUN_CONF, ARG_RUN_BACKWARDS, ARG_MAX_ACTIVE_RUNS, + ARG_BACKFILL_DRY_RUN, ), ), ) diff --git a/airflow/cli/commands/backfill_command.py b/airflow/cli/commands/backfill_command.py index 8714ed5585004..378c6ea5f9502 100644 --- a/airflow/cli/commands/backfill_command.py +++ b/airflow/cli/commands/backfill_command.py @@ -21,10 +21,31 @@ import signal from airflow import settings -from airflow.models.backfill import _create_backfill +from airflow.models.backfill import _create_backfill, _get_info_list +from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import cli as cli_utils from airflow.utils.cli import sigint_handler from airflow.utils.providers_configuration_loader import providers_configuration_loaded +from airflow.utils.session import create_session + + +def _do_dry_run(*, params, dag_id, from_date, to_date, reverse): + print("Performing dry run of backfill.") + print("Printing params:") + for k, v in params.items(): + print(f" - {k} = {v}") + with create_session() as session: + serdag = session.get(SerializedDagModel, dag_id) + + info_list = _get_info_list( + dag=serdag.dag, + from_date=from_date, + to_date=to_date, + reverse=reverse, + ) + print("Logical dates to be attempted:") + for info in info_list: + print(f" - {info.logical_date}") @cli_utils.action_cli @@ -34,6 +55,22 @@ def create_backfill(args) -> None: logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) signal.signal(signal.SIGTERM, sigint_handler) + if args.dry_run: + _do_dry_run( + params=dict( + dag_id=args.dag, + from_date=args.from_date, + to_date=args.to_date, + max_active_runs=args.max_active_runs, + reverse=args.run_backwards, + dag_run_conf=args.dag_run_conf, + ), + dag_id=args.dag, + from_date=args.from_date, + to_date=args.to_date, + reverse=args.run_backwards, + ) + return _create_backfill( dag_id=args.dag, from_date=args.from_date, diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 37a9533113067..53a9fca1df11a 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -175,6 +175,13 @@ def _create_backfill_dag_run(dag, info, backfill_id, dag_run_conf, backfill_sort ) +def _get_info_list(*, dag, from_date, to_date, reverse): + dagrun_info_list = dag.iter_dagrun_infos_between(from_date, to_date) + if reverse: + dagrun_info_list = reversed([x for x in dag.iter_dagrun_infos_between(from_date, to_date)]) + return dagrun_info_list + + def _create_backfill( *, dag_id: str, @@ -200,6 +207,14 @@ def _create_backfill( f"There can be only one running backfill per dag." ) + dag = serdag.dag + depends_on_past = any(x.depends_on_past for x in dag.tasks) + if depends_on_past: + if reverse is True: + raise ValueError( + "Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True" + ) + br = Backfill( dag_id=dag_id, from_date=from_date, @@ -210,18 +225,8 @@ def _create_backfill( session.add(br) session.commit() - dag = serdag.dag - depends_on_past = any(x.depends_on_past for x in dag.tasks) - if depends_on_past: - if reverse is True: - raise ValueError( - "Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True" - ) - backfill_sort_ordinal = 0 - dagrun_info_list = dag.iter_dagrun_infos_between(from_date, to_date) - if reverse: - dagrun_info_list = reversed([x for x in dag.iter_dagrun_infos_between(from_date, to_date)]) + dagrun_info_list = _get_info_list(dag=dag, from_date=from_date, to_date=to_date, reverse=reverse) for info in dagrun_info_list: backfill_sort_ordinal += 1 session.commit()