Skip to content

Commit

Permalink
Implement basic backfill dry run (apache#43241)
Browse files Browse the repository at this point in the history
Add simple dry run functionality. Does not check whether these runs exist. Just doing the basic thing first. Sample console output.
  • Loading branch information
dstandish authored and harjeevanmaan committed Oct 23, 2024
1 parent 835219d commit d01fa0f
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 12 deletions.
6 changes: 6 additions & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ def string_lower_type(val):
type=positive_int(allow_zero=False),
help="Max active runs for this backfill.",
)
ARG_BACKFILL_DRY_RUN = Arg(
("--dry-run",),
help="Perform a dry run",
action="store_true",
)


# misc
Expand Down Expand Up @@ -1030,6 +1035,7 @@ class GroupCommand(NamedTuple):
ARG_DAG_RUN_CONF,
ARG_RUN_BACKWARDS,
ARG_MAX_ACTIVE_RUNS,
ARG_BACKFILL_DRY_RUN,
),
),
)
Expand Down
39 changes: 38 additions & 1 deletion airflow/cli/commands/backfill_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,31 @@
import signal

from airflow import settings
from airflow.models.backfill import _create_backfill
from airflow.models.backfill import _create_backfill, _get_info_list
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils
from airflow.utils.cli import sigint_handler
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import create_session


def _do_dry_run(*, params, dag_id, from_date, to_date, reverse):
print("Performing dry run of backfill.")
print("Printing params:")
for k, v in params.items():
print(f" - {k} = {v}")
with create_session() as session:
serdag = session.get(SerializedDagModel, dag_id)

info_list = _get_info_list(
dag=serdag.dag,
from_date=from_date,
to_date=to_date,
reverse=reverse,
)
print("Logical dates to be attempted:")
for info in info_list:
print(f" - {info.logical_date}")


@cli_utils.action_cli
Expand All @@ -34,6 +55,22 @@ def create_backfill(args) -> None:
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)

if args.dry_run:
_do_dry_run(
params=dict(
dag_id=args.dag,
from_date=args.from_date,
to_date=args.to_date,
max_active_runs=args.max_active_runs,
reverse=args.run_backwards,
dag_run_conf=args.dag_run_conf,
),
dag_id=args.dag,
from_date=args.from_date,
to_date=args.to_date,
reverse=args.run_backwards,
)
return
_create_backfill(
dag_id=args.dag,
from_date=args.from_date,
Expand Down
27 changes: 16 additions & 11 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ def _create_backfill_dag_run(dag, info, backfill_id, dag_run_conf, backfill_sort
)


def _get_info_list(*, dag, from_date, to_date, reverse):
dagrun_info_list = dag.iter_dagrun_infos_between(from_date, to_date)
if reverse:
dagrun_info_list = reversed([x for x in dag.iter_dagrun_infos_between(from_date, to_date)])
return dagrun_info_list


def _create_backfill(
*,
dag_id: str,
Expand All @@ -200,6 +207,14 @@ def _create_backfill(
f"There can be only one running backfill per dag."
)

dag = serdag.dag
depends_on_past = any(x.depends_on_past for x in dag.tasks)
if depends_on_past:
if reverse is True:
raise ValueError(
"Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True"
)

br = Backfill(
dag_id=dag_id,
from_date=from_date,
Expand All @@ -210,18 +225,8 @@ def _create_backfill(
session.add(br)
session.commit()

dag = serdag.dag
depends_on_past = any(x.depends_on_past for x in dag.tasks)
if depends_on_past:
if reverse is True:
raise ValueError(
"Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True"
)

backfill_sort_ordinal = 0
dagrun_info_list = dag.iter_dagrun_infos_between(from_date, to_date)
if reverse:
dagrun_info_list = reversed([x for x in dag.iter_dagrun_infos_between(from_date, to_date)])
dagrun_info_list = _get_info_list(dag=dag, from_date=from_date, to_date=to_date, reverse=reverse)
for info in dagrun_info_list:
backfill_sort_ordinal += 1
session.commit()
Expand Down

0 comments on commit d01fa0f

Please sign in to comment.