Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class BackfillPostBody(StrictBaseModel):
dag_run_conf: dict = {}
reprocess_behavior: ReprocessBehavior = ReprocessBehavior.NONE
max_active_runs: int = 10
run_on_latest_version: bool = True


class BackfillResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8770,6 +8770,10 @@ components:
type: integer
title: Max Active Runs
default: 10
run_on_latest_version:
type: boolean
title: Run On Latest Version
default: true
additionalProperties: false
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def create_backfill(
dag_run_conf=backfill_request.dag_run_conf,
triggering_user_name=user.get_name(),
reprocess_behavior=backfill_request.reprocess_behavior,
run_on_latest_version=backfill_request.run_on_latest_version,
)
return BackfillResponse.model_validate(backfill_obj)

Expand Down
5 changes: 3 additions & 2 deletions airflow-core/src/airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,11 @@ def string_lower_type(val):
ARG_BACKFILL_RUN_ON_LATEST_VERSION = Arg(
("--run-on-latest-version",),
help=(
"(Experimental) If set, the backfill will run tasks using the latest bundle version instead of "
"the version that was active when the original Dag run was created."
"(Experimental) The backfill will run tasks using the latest bundle version instead of "
"the version that was active when the original Dag run was created. Defaults to True."
),
action="store_true",
default=True,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ export const $BackfillPostBody = {
type: 'integer',
title: 'Max Active Runs',
default: 10
},
run_on_latest_version: {
type: 'boolean',
title: 'Run On Latest Version',
default: true
}
},
additionalProperties: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export type BackfillPostBody = {
};
reprocess_behavior?: ReprocessBehavior;
max_active_runs?: number;
run_on_latest_version?: boolean;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ type RunBackfillFormProps = {
readonly dag: DAGResponse | DAGWithLatestDagRunsResponse;
readonly onClose: () => void;
};
const today = new Date().toISOString().slice(0, 16);

type BackfillFormProps = DagRunTriggerParams & Omit<BackfillPostBody, "dag_run_conf">;
const today = new Date().toISOString().slice(0, 16);

const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
const { t: translate } = useTranslation(["components", "common"]);
Expand All @@ -62,6 +61,7 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
max_active_runs: 1,
reprocess_behavior: "none",
run_backwards: false,
run_on_latest_version: true,
to_date: "",
},
mode: "onBlur",
Expand All @@ -78,6 +78,7 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
max_active_runs: values.max_active_runs ?? 1,
reprocess_behavior: values.reprocess_behavior,
run_backwards: values.run_backwards ?? false,
run_on_latest_version: values.run_on_latest_version ?? true,
to_date: values.to_date ?? "",
},
},
Expand All @@ -92,16 +93,11 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
setErrors((prev) => ({ ...prev, date: dateValidationError }));
}
}, [dateValidationError]);

useEffect(() => {
if (conf) {
reset((prevValues) => ({
...prevValues,
conf,
}));
reset((prevValues) => ({ ...prevValues, conf }));
}
}, [conf, reset]);

const dataIntervalStart = watch("from_date");
const dataIntervalEnd = watch("to_date");
const noDataInterval = !Boolean(dataIntervalStart) || !Boolean(dataIntervalEnd);
Expand All @@ -128,16 +124,8 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
reset(fdata);
onClose();
};

const resetDateError = () => {
setErrors((prev) => ({ ...prev, date: undefined }));
};

const affectedTasks = data ?? {
backfills: [],
total_entries: 0,
};

const resetDateError = () => setErrors((prev) => ({ ...prev, date: undefined }));
const affectedTasks = data ?? { backfills: [], total_entries: 0 };
const inlineMessage = getInlineMessage(isPendingDryRun, affectedTasks.total_entries, translate);

return (
Expand Down Expand Up @@ -178,12 +166,7 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
control={control}
name="reprocess_behavior"
render={({ field }) => (
<RadioCardRoot
defaultValue={field.value}
onChange={(event) => {
field.onChange(event);
}}
>
<RadioCardRoot defaultValue={field.value} onChange={field.onChange}>
<RadioCardLabel fontSize="md" fontWeight="semibold" mb={3}>
{translate("backfill.reprocessBehavior")}
</RadioCardLabel>
Expand Down Expand Up @@ -230,6 +213,16 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
)}
/>
<Spacer />
<Controller
control={control}
name="run_on_latest_version"
render={({ field }) => (
<Checkbox checked={field.value} colorPalette="brand" onChange={field.onChange}>
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
)}
/>
<Spacer />
{dag.is_paused ? (
<>
<Checkbox
Expand All @@ -243,7 +236,6 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
<Spacer />
</>
) : undefined}

<ConfigForm
control={control}
errors={errors}
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/tests/unit/cli/commands/test_backfill_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_backfill(self, mock_create, repro, expected_repro):
dag_run_conf=None,
reprocess_behavior=expected_repro,
triggering_user_name="root",
run_on_latest_version=False,
run_on_latest_version=True,
)

@mock.patch("airflow.cli.commands.backfill_command._create_backfill")
Expand Down Expand Up @@ -189,7 +189,7 @@ def test_backfill_with_dag_run_conf(self, mock_create):
dag_run_conf={"example_key": "example_value"},
reprocess_behavior=None,
triggering_user_name="root",
run_on_latest_version=False,
run_on_latest_version=True,
)

def test_backfill_with_invalid_dag_run_conf(self):
Expand Down Expand Up @@ -235,5 +235,5 @@ def test_backfill_with_empty_dag_run_conf(self, mock_create):
dag_run_conf={},
reprocess_behavior=None,
triggering_user_name="root",
run_on_latest_version=False,
run_on_latest_version=True,
)
1 change: 1 addition & 0 deletions airflow-ctl/src/airflowctl/api/datamodels/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ class BackfillPostBody(BaseModel):
dag_run_conf: Annotated[dict[str, Any] | None, Field(title="Dag Run Conf")] = {}
reprocess_behavior: ReprocessBehavior | None = "none"
max_active_runs: Annotated[int | None, Field(title="Max Active Runs")] = 10
run_on_latest_version: Annotated[bool | None, Field(title="Run On Latest Version")] = True


class BackfillResponse(BaseModel):
Expand Down