Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions airflow-core/docs/core-concepts/params.rst
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,55 @@ Disabling Runtime Param Modification

The ability to update params while triggering a Dag depends on the flag ``core.dag_run_conf_overrides_params``.
Setting this config to ``False`` will effectively turn your default params into constants.

Pre-populating Trigger Form via URL
-----------------------------------

To pre-populate values in the form when publishing a link to the trigger form you can call the trigger URL ``/dags/<dag_name>/trigger/single`` or ``/dags/<dag_name>/trigger/backfill``,
and add query parameters to the URL.

There are two trigger form URLs available, each supporting a different set of query parameters:

* ``/trigger/single``:
- ``conf`` - JSON configuration.
- ``run_id`` - run identifier.
- ``logical_date`` - execution date in ``YYYY-MM-DDTHH:mm:ss.SSS`` format. Defaults to the current timestamp if not provided.
- ``note`` - note attached to the DAG run.

* ``/trigger/backfill``:
- ``conf`` - JSON configuration, applied to all runs.
- ``from_date`` - start of the backfill window in ``YYYY-MM-DDTHH:mm:ss`` format.
- ``to_date`` - end of the backfill window in ``YYYY-MM-DDTHH:mm:ss`` format.
- ``max_active_runs`` - maximum concurrent runs. Defaults to ``1``.
- ``reprocess_behavior`` - determines how existing runs are reprocessed. Supported values are:

* ``failed`` - Missing and Errored Runs
* ``completed`` - All Runs
* ``none`` - Missing Runs

- ``run_backwards`` - if set to true, the backfill is scheduled in reverse order. Defaults to ``false``.

The trigger form supports two different ways of providing ``conf`` values. The available input methods are summarized in the table below:

.. list-table:: ``conf`` parameter usage
:header-rows: 1
:widths: 15 35 55

* - Form
- Usage
- Example
* - JSON (explicit)
- Provide the entire configuration as a JSON object.
This form has higher priority if present.
- ``/dags/{dag_id}/trigger/single?conf={"foo":"bar","x":123}``
* - Key-value (implicit)
- If ``conf`` is not specified, any query parameter that is not a reserved keyword
will be automatically collected into ``conf``.
- ``/dags/{dag_id}/trigger/single?run_id=myrun&foo=bar&x=123``
results in ``conf={"foo":"bar","x":"123"}``

For example, you can pass the pathname and query like below:

``/dags/{dag_id}/trigger/single?run_id=my_run_dag&logical_date=2025-09-06T12:34:56.789&conf={"foo":"bar"}&note=run_note``

``/dags/{dag_id}/trigger/backfill?from_date=2025-09-01T00:00:00&to_date=2025-09-03T23:59:59&conf={"abc":"loo"}&max_active_runs=2&reprocess_behavior=failed&run_backwards=true``
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/ui/src/components/ConfigForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ConfigFormProps<T extends FieldValues = FieldValues> = {
date?: unknown;
};
readonly initialParamsDict: { paramsDict: ParamsSpec };
readonly openAdvanced?: boolean;
readonly setErrors: React.Dispatch<
React.SetStateAction<{
conf?: string;
Expand All @@ -49,6 +50,7 @@ const ConfigForm = <T extends FieldValues = FieldValues>({
control,
errors,
initialParamsDict,
openAdvanced = false,
setErrors,
setFormError,
}: ConfigFormProps<T>) => {
Expand Down Expand Up @@ -83,7 +85,9 @@ const ConfigForm = <T extends FieldValues = FieldValues>({
return (
<Accordion.Root
collapsible
defaultValue={[flexibleFormDefaultSection]}
defaultValue={
openAdvanced ? [flexibleFormDefaultSection, "advancedOptions"] : [flexibleFormDefaultSection]
}
mb={4}
overflow="visible"
size="lg"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,27 @@ import dayjs from "dayjs";
import { useEffect, useState } from "react";
import { Controller, useForm, useWatch } from "react-hook-form";
import { useTranslation } from "react-i18next";
import { useSearchParams } from "react-router-dom";

import type { BackfillPostBody, DAGResponse, DAGWithLatestDagRunsResponse } from "openapi/requests/types.gen";
import type {
DAGResponse,
DAGWithLatestDagRunsResponse,
BackfillPostBody,
ReprocessBehavior,
} from "openapi/requests/types.gen";
import { RadioCardItem, RadioCardLabel, RadioCardRoot } from "src/components/ui/RadioCard";
import { reprocessBehaviors } from "src/constants/reprocessBehaviourParams";
import { useCreateBackfill } from "src/queries/useCreateBackfill";
import { useCreateBackfillDryRun } from "src/queries/useCreateBackfillDryRun";
import { useDagParams } from "src/queries/useDagParams";
import { useParamStore } from "src/queries/useParamStore";
import { useTogglePause } from "src/queries/useTogglePause";
import { getTriggerConf } from "src/utils/trigger";
import type { DagRunTriggerParams } from "src/utils/trigger";

import ConfigForm from "../ConfigForm";
import { DateTimeInput } from "../DateTimeInput";
import { ErrorAlert } from "../ErrorAlert";
import type { DagRunTriggerParams } from "../TriggerDag/TriggerDAGForm";
import { Checkbox } from "../ui/Checkbox";
import { getInlineMessage } from "./inlineMessage";

Expand All @@ -52,16 +59,19 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
const [formError, setFormError] = useState(false);
const initialParamsDict = useDagParams(dag.dag_id, true);
const { conf } = useParamStore();
const { control, handleSubmit, reset, watch } = useForm<BackfillFormProps>({
const [searchParams] = useSearchParams();
const reservedKeys = ["from_date", "to_date", "max_active_runs", "reprocess_behavior", "run_backwards"];
const urlConf = getTriggerConf(searchParams, reservedKeys);
const { control, handleSubmit, reset } = useForm<BackfillFormProps>({
defaultValues: {
conf,
conf: urlConf === "{}" ? conf || "{}" : urlConf,
dag_id: dag.dag_id,
from_date: "",
max_active_runs: 1,
reprocess_behavior: "none",
run_backwards: false,
from_date: searchParams.get("from_date") ?? "",
max_active_runs: parseInt(searchParams.get("max_active_runs") ?? "1", 10) || 1,
reprocess_behavior: (searchParams.get("reprocess_behavior") ?? "none") as ReprocessBehavior,
run_backwards: searchParams.get("run_backwards") === "true",
run_on_latest_version: true,
to_date: "",
to_date: searchParams.get("to_date") ?? "",
},
mode: "onBlur",
});
Expand Down Expand Up @@ -91,16 +101,13 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
if (Boolean(dateValidationError)) {
setErrors((prev) => ({ ...prev, date: dateValidationError }));
}
}, [dateValidationError]);
useEffect(() => {
if (conf) {
reset((prevValues) => ({ ...prevValues, conf }));
if (Boolean(conf) && urlConf === "{}") {
reset((prev) => ({ ...prev, conf }));
}
}, [conf, reset]);
const dataIntervalStart = watch("from_date");
const dataIntervalEnd = watch("to_date");
const noDataInterval = !Boolean(dataIntervalStart) || !Boolean(dataIntervalEnd);
const dataIntervalInvalid = dayjs(dataIntervalStart).isAfter(dayjs(dataIntervalEnd));
}, [dateValidationError, conf, reset, urlConf]);

const noDataInterval = !Boolean(values.from_date) || !Boolean(values.to_date);
const dataIntervalInvalid = dayjs(values.from_date).isAfter(dayjs(values.to_date));

const onSubmit = (fdata: BackfillFormProps) => {
if (unpause && dag.is_paused) {
Expand Down Expand Up @@ -239,6 +246,10 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
control={control}
errors={errors}
initialParamsDict={initialParamsDict}
openAdvanced={
urlConf !== "{}" ||
["max_active_runs", "reprocess_behavior", "run_backwards"].some((key) => searchParams.has(key))
}
setErrors={setErrors}
setFormError={setFormError}
/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import { Input, Field, Stack } from "@chakra-ui/react";
import { Controller, type Control } from "react-hook-form";
import { useTranslation } from "react-i18next";

import type { DagRunTriggerParams } from "src/utils/trigger";

import EditableMarkdown from "./EditableMarkdown";
import type { DagRunTriggerParams } from "./TriggerDAGForm";

type TriggerDAGAdvancedOptionsProps = {
readonly control: Control<DagRunTriggerParams>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,26 @@
*/
import { Button, Box, Spacer, HStack, Field, Stack, Text, VStack } from "@chakra-ui/react";
import dayjs from "dayjs";
import { useEffect, useState } from "react";
import { useEffect, useState, useRef, useMemo } from "react";
import { Controller, useForm } from "react-hook-form";
import { useTranslation } from "react-i18next";
import { FiPlay } from "react-icons/fi";
import { useSearchParams } from "react-router-dom";

import { useDagParams } from "src/queries/useDagParams";
import { useParamStore } from "src/queries/useParamStore";
import { useTogglePause } from "src/queries/useTogglePause";
import { useTrigger } from "src/queries/useTrigger";
import { DEFAULT_DATETIME_FORMAT } from "src/utils/datetimeUtils";
import {
getTriggerConf,
mergeUrlParams,
getUpdatedParamsDict,
type DagRunTriggerParams,
dataIntervalModeOptions,
extractParamValues,
type TriggerDAGFormProps,
} from "src/utils/trigger";

import ConfigForm from "../ConfigForm";
import { DateTimeInput } from "../DateTimeInput";
Expand All @@ -36,33 +46,6 @@ import { Checkbox } from "../ui/Checkbox";
import { RadioCardItem, RadioCardRoot } from "../ui/RadioCard";
import TriggerDAGAdvancedOptions from "./TriggerDAGAdvancedOptions";

type TriggerDAGFormProps = {
readonly dagDisplayName: string;
readonly dagId: string;
readonly hasSchedule: boolean;
readonly isPaused: boolean;
readonly onClose: () => void;
readonly open: boolean;
};

type DataIntervalMode = "auto" | "manual";

export type DagRunTriggerParams = {
conf: string;
dagRunId: string;
dataIntervalEnd: string;
dataIntervalMode: DataIntervalMode;
dataIntervalStart: string;
logicalDate: string;
note: string;
partitionKey: string | undefined;
};

const dataIntervalModeOptions: Array<{ label: string; value: DataIntervalMode }> = [
{ label: "components:triggerDag.dataIntervalAuto", value: "auto" },
{ label: "components:triggerDag.dataIntervalManual", value: "manual" },
];

const TriggerDAGForm = ({
dagDisplayName,
dagId,
Expand All @@ -76,38 +59,84 @@ const TriggerDAGForm = ({
const [formError, setFormError] = useState(false);
const initialParamsDict = useDagParams(dagId, open);
const { error: errorTrigger, isPending, triggerDagRun } = useTrigger({ dagId, onSuccessConfirm: onClose });
const { conf } = useParamStore();
const { conf, setParamsDict } = useParamStore();
const [unpause, setUnpause] = useState(true);
const [searchParams] = useSearchParams();
const urlConf = getTriggerConf(searchParams, ["run_id", "logical_date", "note"]);
const urlRunId = searchParams.get("run_id") ?? "";
const urlDate = searchParams.get("logical_date");
const urlNote = searchParams.get("note") ?? "";

const { mutate: togglePause } = useTogglePause({ dagId });

const { control, handleSubmit, reset, watch } = useForm<DagRunTriggerParams>({
const defaultsRef = useRef<DagRunTriggerParams | undefined>(undefined);
const isSyncedRef = useRef(false);

const cleanInitialParams = useMemo(
() => extractParamValues(initialParamsDict.paramsDict as Record<string, unknown>),
[initialParamsDict.paramsDict],
);
const { control, getValues, handleSubmit, reset, watch } = useForm<DagRunTriggerParams>({
defaultValues: {
conf,
dagRunId: "",
...initialParamsDict,
conf: urlConf === "{}" ? conf || "{}" : urlConf,
dagRunId: urlRunId,
dataIntervalEnd: "",
dataIntervalMode: "auto",
dataIntervalStart: "",
// Default logical date to now, show it in the selected timezone
logicalDate: dayjs().format(DEFAULT_DATETIME_FORMAT),
note: "",
logicalDate: urlDate ?? dayjs().format(DEFAULT_DATETIME_FORMAT),
note: urlNote,
params: cleanInitialParams,
partitionKey: undefined,
},
});

// Automatically reset form when conf is fetched
useEffect(() => {
if (conf) {
reset((prevValues) => ({
...prevValues,
conf,
}));
if (defaultsRef.current === undefined && Object.keys(cleanInitialParams).length > 0) {
const current = getValues();

defaultsRef.current = {
...current,
params: cleanInitialParams,
};
}
}, [conf, reset]);
}, [getValues, cleanInitialParams]);

const resetDateError = () => {
setErrors((prev) => ({ ...prev, date: undefined }));
};
useEffect(() => {
if (defaultsRef.current === undefined) {
return;
}

if (isSyncedRef.current) {
return;
}

if (urlConf === "{}") {
if (conf) {
reset((prev) => ({ ...prev, conf }));
}
isSyncedRef.current = true;

return;
}

const mergedValues = mergeUrlParams(urlConf, defaultsRef.current.params ?? {});

reset({
...defaultsRef.current,
conf: JSON.stringify(mergedValues, undefined, 2),
dagRunId: urlRunId || defaultsRef.current.dagRunId,
logicalDate: urlDate ?? defaultsRef.current.logicalDate,
note: urlNote || defaultsRef.current.note,
});

setParamsDict(getUpdatedParamsDict(initialParamsDict.paramsDict, mergedValues));
isSyncedRef.current = true;
}, [urlConf, urlRunId, urlDate, urlNote, initialParamsDict, reset, setParamsDict, conf]);

const resetDateError = () => setErrors((prev) => ({ ...prev, date: undefined }));

const dataIntervalMode = watch("dataIntervalMode");
const dataIntervalStart = watch("dataIntervalStart");
Expand All @@ -116,17 +145,17 @@ const TriggerDAGForm = ({
const dataIntervalInvalid =
dataIntervalMode === "manual" &&
(noDataInterval || dayjs(dataIntervalStart).isAfter(dayjs(dataIntervalEnd)));

const onSubmit = (data: DagRunTriggerParams) => {
if (unpause && isPaused) {
togglePause({
dagId,
requestBody: {
is_paused: false,
},
});
togglePause({ dagId, requestBody: { is_paused: false } });
}
triggerDagRun(data);

const finalParams = mergeUrlParams(data.conf, data.params ?? {});

triggerDagRun({
...data,
conf: JSON.stringify(finalParams),
});
};

return (
Expand Down Expand Up @@ -219,6 +248,7 @@ const TriggerDAGForm = ({
control={control}
errors={errors}
initialParamsDict={initialParamsDict}
openAdvanced={urlConf !== "{}" || Boolean(urlRunId) || Boolean(urlDate) || Boolean(urlNote)}
setErrors={setErrors}
setFormError={setFormError}
>
Expand Down
Loading
Loading