From cc54c2d2287ac1606a317055c6e7ae8e9e600fb0 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 25 Jan 2023 00:19:33 +0100 Subject: [PATCH 01/14] add params argument for dag_ran and deprecate conf --- airflow/jobs/backfill_job.py | 5 +++++ airflow/models/dag.py | 18 ++++++++++++++++++ airflow/models/dagrun.py | 3 +++ airflow/models/param.py | 2 ++ 4 files changed, 28 insertions(+) diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index ee37b9d51026e..32fdbb3454219 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -115,6 +115,7 @@ def __init__( delay_on_limit_secs=1.0, verbose=False, conf=None, + params=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False, @@ -137,6 +138,7 @@ def __init__( :param delay_on_limit_secs: :param verbose: :param conf: a dictionary which user could pass k-v pairs for backfill + :param params: a dictionary which user could pass k-v pairs for backfill :param rerun_failed_tasks: flag to whether to auto rerun the failed task in backfill :param run_backwards: Whether to process the dates from most to least recent @@ -157,6 +159,7 @@ def __init__( self.delay_on_limit_secs = delay_on_limit_secs self.verbose = verbose self.conf = conf + self.params = params self.rerun_failed_tasks = rerun_failed_tasks self.run_backwards = run_backwards self.run_at_least_once = run_at_least_once @@ -321,6 +324,7 @@ def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = Non respect_dag_max_active_limit = False # Fixes --conf overwrite for backfills with already existing DagRuns run.conf = self.conf or {} + run.params = self.params or {} # start_date is cleared for existing DagRuns run.start_date = timezone.utcnow() else: @@ -339,6 +343,7 @@ def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = Non external_trigger=False, session=session, conf=self.conf, + params=self.params, run_type=DagRunType.BACKFILL_JOB, creating_job_id=self.id, ) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index b92bc5ad96837..e68202d1c7200 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2399,6 +2399,7 @@ def run( delay_on_limit_secs=1.0, verbose=False, conf=None, + params=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False, @@ -2422,6 +2423,7 @@ def run( dag run when max_active_runs limit has been reached :param verbose: Make logging output more verbose :param conf: user defined dictionary passed from CLI + :param params: user defined dictionary passed from CLI :param rerun_failed_tasks: :param run_backwards: :param run_at_least_once: If true, always run the DAG at least once even @@ -2450,6 +2452,7 @@ def run( delay_on_limit_secs=delay_on_limit_secs, verbose=verbose, conf=conf, + params=params, rerun_failed_tasks=rerun_failed_tasks, run_backwards=run_backwards, run_at_least_once=run_at_least_once, @@ -2473,6 +2476,7 @@ def test( self, execution_date: datetime | None = None, run_conf: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, conn_file_path: str | None = None, variable_file_path: str | None = None, session: Session = NEW_SESSION, @@ -2482,6 +2486,7 @@ def test( :param execution_date: execution date for the DAG run :param run_conf: configuration to pass to newly created dagrun + :param params: parameters to pass to newly created dagrun :param conn_file_path: file path to a connection file in either yaml or json :param variable_file_path: file path to a variable file in either yaml or json :param session: database connection (optional) @@ -2527,6 +2532,7 @@ def add_logger_if_needed(ti: TaskInstance): run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date), session=session, conf=run_conf, + params=params, ) tasks = self.task_dict @@ -2553,6 +2559,7 @@ def create_dagrun( start_date: datetime | None = None, external_trigger: bool | None = False, conf: dict | None = None, + params: dict | None = None, run_type: DagRunType | None = None, session: Session = NEW_SESSION, dag_hash: str | None = None, @@ -2570,6 +2577,7 @@ def create_dagrun( :param start_date: the date this dag run should be evaluated :param external_trigger: whether this dag run is externally triggered :param conf: Dict containing configuration/parameters to pass to the DAG + :param params: Dict containing parameters to pass to the DAG :param creating_job_id: id of the job creating this DagRun :param session: database session :param dag_hash: Hash of Serialized DAG @@ -2628,9 +2636,15 @@ def create_dagrun( stacklevel=3, ) + if conf: + warnings.warn( + "dag_run conf is deprecated. Please use params instead", DeprecationWarning, stacklevel=2 + ) + # create a copy of params before validating copied_params = copy.deepcopy(self.params) copied_params.update(conf or {}) + copied_params.update(params or {}) copied_params.validate() run = DagRun( @@ -2640,6 +2654,7 @@ def create_dagrun( start_date=start_date, external_trigger=external_trigger, conf=conf, + params=params, state=state, run_type=run_type, dag_hash=dag_hash, @@ -3654,6 +3669,7 @@ def _run_task(ti: TaskInstance, session): def _get_or_create_dagrun( dag: DAG, conf: dict[Any, Any] | None, + params: dict[Any, Any] | None, start_date: datetime, execution_date: datetime, run_id: str, @@ -3664,6 +3680,7 @@ def _get_or_create_dagrun( This function is only meant for the `dag.test` function as a helper function. :param dag: Dag to be used to find dagrun :param conf: configuration to pass to newly created dagrun + :param params: parameters to pass to newly created dagrun :param start_date: start date of new dagrun, defaults to execution_date :param execution_date: execution_date for finding the dagrun :param run_id: run_id to pass to new dagrun @@ -3686,6 +3703,7 @@ def _get_or_create_dagrun( start_date=start_date or execution_date, session=session, conf=conf, # type: ignore + params=params, ) log.info("created dagrun " + str(dr)) return dr diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 2c736c4c2efe5..fd9a8dd63bb44 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -118,6 +118,7 @@ class DagRun(Base, LoggingMixin): external_trigger = Column(Boolean, default=True) run_type = Column(String(50), nullable=False) conf = Column(PickleType) + params = Column(PickleType) # These two must be either both NULL or both datetime. data_interval_start = Column(UtcDateTime) data_interval_end = Column(UtcDateTime) @@ -193,6 +194,7 @@ def __init__( start_date: datetime | None = None, external_trigger: bool | None = None, conf: Any | None = None, + params: Any | None = None, state: DagRunState | None = None, run_type: str | None = None, dag_hash: str | None = None, @@ -211,6 +213,7 @@ def __init__( self.start_date = start_date self.external_trigger = external_trigger self.conf = conf or {} + self.params = params or {} if state is not None: self.state = state if queued_at is NOTSET: diff --git a/airflow/models/param.py b/airflow/models/param.py index 2f93f7dd88341..45c965a0fa7e6 100644 --- a/airflow/models/param.py +++ b/airflow/models/param.py @@ -316,4 +316,6 @@ def process_params( if conf.getboolean("core", "dag_run_conf_overrides_params") and dag_run and dag_run.conf: logger.debug("Updating task params (%s) with DagRun.conf (%s)", params, dag_run.conf) params.update(dag_run.conf) + if dag_run and dag_run.params: + params.update(dag_run.params) return params.validate() From 816bf5c2021bb1b8efa1821d13358843ef915f19 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 25 Jan 2023 00:20:23 +0100 Subject: [PATCH 02/14] replace conf by params in Airflow UI --- airflow/www/api/experimental/endpoints.py | 12 ++-- airflow/www/forms.py | 6 +- airflow/www/static/js/api/useGridData.test.ts | 4 +- .../static/js/dag/details/dagRun/index.tsx | 14 ++-- .../static/js/dag/grid/dagRuns/index.test.tsx | 12 ++-- airflow/www/static/js/dag/grid/index.test.tsx | 4 +- airflow/www/static/js/types/index.ts | 4 +- airflow/www/static/js/utils/index.test.ts | 4 +- airflow/www/templates/airflow/dag.html | 2 +- airflow/www/templates/airflow/dags.html | 2 +- airflow/www/templates/airflow/trigger.html | 13 +--- airflow/www/utils.py | 24 +++---- airflow/www/views.py | 64 +++++++++---------- 13 files changed, 77 insertions(+), 88 deletions(-) diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index 012e1b2aa32a3..d23f294ce7c38 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -89,11 +89,11 @@ def trigger_dag(dag_id): if "run_id" in data: run_id = data["run_id"] - conf = None - if "conf" in data: - conf = data["conf"] - if not isinstance(conf, dict): - error_message = "Dag Run conf must be a dictionary object, other types are not supported" + params = None + if "params" in data: + params = data["params"] + if not isinstance(params, dict): + error_message = "Dag Run params must be a dictionary object, other types are not supported" log.error(error_message) response = jsonify({"error": error_message}) response.status_code = 400 @@ -122,7 +122,7 @@ def trigger_dag(dag_id): replace_microseconds = to_boolean(data["replace_microseconds"]) try: - dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds) + dr = trigger.trigger_dag(dag_id, run_id, params, execution_date, replace_microseconds) except AirflowException as err: log.error(err) response = jsonify(error=f"{err}") diff --git a/airflow/www/forms.py b/airflow/www/forms.py index 78ea1a856543a..c90da3706d277 100644 --- a/airflow/www/forms.py +++ b/airflow/www/forms.py @@ -137,15 +137,15 @@ class DagRunEditForm(DynamicForm): lazy_gettext("Logical Date"), widget=AirflowDateTimePickerROWidget(), ) - conf = TextAreaField(lazy_gettext("Conf"), widget=BS3TextAreaROWidget()) + params = TextAreaField(lazy_gettext("Params"), widget=BS3TextAreaROWidget()) note = TextAreaField(lazy_gettext("User Note"), widget=BS3TextAreaFieldWidget()) def populate_obj(self, item): """Populates the attributes of the passed obj with data from the form's fields.""" super().populate_obj(item) item.run_type = DagRunType.from_run_id(item.run_id) - if item.conf: - item.conf = json.loads(item.conf) + if item.params: + item.params = json.loads(item.params) class TaskInstanceEditForm(DynamicForm): diff --git a/airflow/www/static/js/api/useGridData.test.ts b/airflow/www/static/js/api/useGridData.test.ts index d84abf67e6963..2fd78aa3ba268 100644 --- a/airflow/www/static/js/api/useGridData.test.ts +++ b/airflow/www/static/js/api/useGridData.test.ts @@ -33,8 +33,8 @@ const commonDagRunParams = { endDate: null, lastSchedulingDecision: null, externalTrigger: false, - conf: null, - confIsJson: false, + params: null, + paramsIsJson: false, note: '', }; diff --git a/airflow/www/static/js/dag/details/dagRun/index.tsx b/airflow/www/static/js/dag/details/dagRun/index.tsx index 3f69e693d5bb6..8ef1d4387c733 100644 --- a/airflow/www/static/js/dag/details/dagRun/index.tsx +++ b/airflow/www/static/js/dag/details/dagRun/index.tsx @@ -66,7 +66,7 @@ const DagRun = ({ runId }: Props) => { const detailsRef = useRef(null); const offsetHeight = useOffsetHeight(detailsRef); const run = dagRuns.find((dr) => dr.runId === runId); - const { onCopy, hasCopied } = useClipboard(run?.conf || ''); + const { onCopy, hasCopied } = useClipboard(run?.params || ''); if (!run) return null; const { executionDate, @@ -79,8 +79,8 @@ const DagRun = ({ runId }: Props) => { endDate, queuedAt, externalTrigger, - conf, - confIsJson, + params, + paramsIsJson, note, } = run; const graphParams = new URLSearchParamsWrapper({ @@ -207,14 +207,14 @@ const DagRun = ({ runId }: Props) => { - Run config + Params { - confIsJson + paramsIsJson ? ( { ) - : {conf ?? 'None'} + : {params ?? 'None'} } diff --git a/airflow/www/static/js/dag/grid/dagRuns/index.test.tsx b/airflow/www/static/js/dag/grid/dagRuns/index.test.tsx index e81d5cbfc13e4..69b57787e8b69 100644 --- a/airflow/www/static/js/dag/grid/dagRuns/index.test.tsx +++ b/airflow/www/static/js/dag/grid/dagRuns/index.test.tsx @@ -43,8 +43,8 @@ const generateRuns = (length: number): DagRun[] => ( lastSchedulingDecision: datestring, executionDate: datestring, externalTrigger: false, - conf: null, - confIsJson: false, + params: null, + paramsIsJson: false, note: 'someRandomValue', })) ); @@ -64,8 +64,8 @@ describe('Test DagRuns', () => { executionDate: '2021-11-08T21:14:19.704433+00:00', lastSchedulingDecision: datestring, externalTrigger: false, - conf: null, - confIsJson: false, + params: null, + paramsIsJson: false, note: 'someRandomValue', }, { @@ -80,8 +80,8 @@ describe('Test DagRuns', () => { executionDate: '2021-11-08T21:14:19.704433+00:00', lastSchedulingDecision: datestring, externalTrigger: false, - conf: null, - confIsJson: false, + params: null, + paramsIsJson: false, note: 'someRandomValue', }, ]; diff --git a/airflow/www/static/js/dag/grid/index.test.tsx b/airflow/www/static/js/dag/grid/index.test.tsx index 4475a75a345a7..746c7123b7fd8 100644 --- a/airflow/www/static/js/dag/grid/index.test.tsx +++ b/airflow/www/static/js/dag/grid/index.test.tsx @@ -102,8 +102,8 @@ const mockGridData = { lastSchedulingDecision: '2021-11-08T21:14:19.704433+00:00', note: 'myCoolDagRun', externalTrigger: false, - conf: null, - confIsJson: false, + params: null, + paramsIsJson: false, }, ], ordering: ['dataIntervalStart'], diff --git a/airflow/www/static/js/types/index.ts b/airflow/www/static/js/types/index.ts index 524e09a92a3a1..afcfdd968d35d 100644 --- a/airflow/www/static/js/types/index.ts +++ b/airflow/www/static/js/types/index.ts @@ -54,8 +54,8 @@ interface DagRun { endDate: string | null; lastSchedulingDecision: string | null; externalTrigger: boolean; - conf: string | null; - confIsJson: boolean; + params: string | null; + paramsIsJson: boolean; note: string | null; } diff --git a/airflow/www/static/js/utils/index.test.ts b/airflow/www/static/js/utils/index.test.ts index f0d7787ea644f..6bd04aa9ffcc4 100644 --- a/airflow/www/static/js/utils/index.test.ts +++ b/airflow/www/static/js/utils/index.test.ts @@ -130,8 +130,8 @@ describe('Test getDagRunLabel', () => { executionDate: '2021-12-09T21:14:19.704433+00:00', lastSchedulingDecision: '2021-11-08T21:14:19.704433+00:00', externalTrigger: false, - conf: null, - confIsJson: false, + params: null, + paramsIsJson: false, note: 'someRandomValue', } as DagRun; diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 9fc7d516c0ae1..137d3eac590a8 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -236,7 +236,7 @@

Trigger DAG -
  • Trigger DAG w/ config
  • +
  • Trigger DAG w/ params
  • {{ page_title }}

    -
  • Trigger DAG w/ config
  • +
  • Trigger DAG w/ params
  • {% endif %} diff --git a/airflow/www/templates/airflow/trigger.html b/airflow/www/templates/airflow/trigger.html index 8c7e590aa2c55..c8d2b0c9211d0 100644 --- a/airflow/www/templates/airflow/trigger.html +++ b/airflow/www/templates/airflow/trigger.html @@ -58,20 +58,13 @@

    Trigger DAG: {{ dag_id }}

    - - + +

    - To access configuration in your DAG use {{ '{{ dag_run.conf }}' }}. - {% if is_dag_run_conf_overrides_params %} - As core.dag_run_conf_overrides_params is set to True, so passing any configuration - here will override task params which can be accessed via {{ '{{ params }}' }}. - {% else %} - As core.dag_run_conf_overrides_params is set to False, so passing any configuration - here won't override task params. - {% endif %} + To access parameters in your DAG use {{ '{{ params }}' }}.

    - - + + {% for param in recent_params %} + {% endfor %}
    From cef854150bbd3303a9ad206ad09aafde3c2c28ba Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Thu, 26 Jan 2023 01:05:56 +0100 Subject: [PATCH 05/14] delete unecessary arg --- airflow/www/views.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 33a8514c7522f..9ba6714ae9b3b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1948,7 +1948,6 @@ def trigger(self, session=None): unpause = request.values.get("unpause") request_params = request.values.get("params") request_execution_date = request.values.get("execution_date", default=timezone.utcnow().isoformat()) - is_dag_run_conf_overrides_params = conf.getboolean("core", "dag_run_conf_overrides_params") dag = get_airflow_app().dag_bag.get_dag(dag_id) dag_orm = session.query(DagModel).filter(DagModel.dag_id == dag_id).first() if not dag_orm: @@ -2047,7 +2046,6 @@ def trigger(self, session=None): origin=origin, params=request_params, form=form, - is_dag_run_conf_overrides_params=is_dag_run_conf_overrides_params, recent_params=recent_params, ) except json.decoder.JSONDecodeError: From 8c2b293016f3514edf75278e5fdc97f797e218e9 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Thu, 26 Jan 2023 02:23:56 +0100 Subject: [PATCH 06/14] restore conf and deprecate it --- airflow/api_connexion/endpoints/dag_run_endpoint.py | 2 ++ airflow/api_connexion/openapi/v1.yaml | 11 +++++++++++ airflow/api_connexion/schemas/dag_run_schema.py | 1 + airflow/www/static/js/types/api-generated.ts | 11 +++++++++++ 4 files changed, 25 insertions(+) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 010eb9b612c97..90c90900da0d5 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -162,6 +162,7 @@ def _fetch_dag_runs( "start_date", "end_date", "external_trigger", + "conf", "params", ] query = apply_sorting(query, order_by, to_replace, allowed_filter_attrs) @@ -314,6 +315,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: execution_date=logical_date, data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date), state=DagRunState.QUEUED, + conf=post_body.get("conf"), params=post_body.get("params"), external_trigger=True, dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id), diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 6cc18b8fb4b3a..f2bc82494c43d 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2801,6 +2801,15 @@ components: type: boolean default: true readOnly: true + conf: + type: object + description: | + JSON object describing additional configuration parameters. + The value of this field can be set only when creating the object. If you try to modify the + field of an existing object, the request fails with an BAD_REQUEST error. + + *Deprecated since version 2.6.0*: Use 'params' instead. + deprecated: true params: type: object description: | @@ -2808,6 +2817,8 @@ components: The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error. + + *New in version 2.6.0* note: type: string description: | diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index 156bb174ee32e..5c7825d62cf18 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -68,6 +68,7 @@ class Meta: end_date = auto_field(dump_only=True) state = DagStateField(dump_only=True) external_trigger = auto_field(dump_default=True, dump_only=True) + conf = ParamsObject() params = ParamsObject() data_interval_start = auto_field(dump_only=True) data_interval_end = auto_field(dump_only=True) diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 61994c26888d2..da26a8965aca6 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1031,11 +1031,22 @@ export interface components { state?: components["schemas"]["DagState"]; /** @default true */ external_trigger?: boolean; + /** + * @deprecated + * @description JSON object describing additional configuration parameters. + * The value of this field can be set only when creating the object. If you try to modify the + * field of an existing object, the request fails with an BAD_REQUEST error. + * + * *Deprecated since version 2.6.0*: Use 'params' instead. + */ + conf?: { [key: string]: unknown }; /** * @description JSON object describing additional configuration parameters. * * The value of this field can be set only when creating the object. If you try to modify the * field of an existing object, the request fails with an BAD_REQUEST error. + * + * *New in version 2.6.0* */ params?: { [key: string]: unknown }; /** From 63d1829790e820a01072fa52bf27b14f324928bf Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Thu, 26 Jan 2023 02:31:55 +0100 Subject: [PATCH 07/14] add a new check for provided params --- airflow/models/dag.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index e68202d1c7200..e3e873e9c6997 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -70,6 +70,7 @@ AirflowException, AirflowSkipException, DuplicateTaskIdFound, + ParamValidationError, RemovedInAirflow3Warning, TaskNotFound, ) @@ -2550,6 +2551,33 @@ def add_logger_if_needed(ti: TaskInstance): # Remove the local variables we have added to the secrets_backend_list secrets_backend_list.pop(0) + def _check_params(self, dag_run_params): + """ + Validates & raise exception if there are any extra provided Params not defined in the dag, missing + Params which don't have a default value in the dag, or invalid params + """ + if dag_run_params is None: + dag_run_params = {} + for k, param in self.params.items(): + # As type can be an array, we would check if `null` is an allowed type or not + if not param.has_value and ("type" not in param.schema or "null" not in param.schema["type"]): + if k not in dag_run_params: + raise ParamValidationError( + f"You should provide a value for the required params without default value: {k}" + ) + + if k in dag_run_params: + try: + param.resolve(value=dag_run_params[k]) + except ParamValidationError as ve: + raise ParamValidationError(f"Invalid input for param {k}: {ve}") + + if dag_run_params is None: + return + for k, v in dag_run_params.items(): + if k not in self.params: + raise ParamValidationError(f"The param {k} is not defined in the dag params") + @provide_session def create_dagrun( self, @@ -2641,6 +2669,8 @@ def create_dagrun( "dag_run conf is deprecated. Please use params instead", DeprecationWarning, stacklevel=2 ) + self._check_params(params) + # create a copy of params before validating copied_params = copy.deepcopy(self.params) copied_params.update(conf or {}) From 360aa74258d3752df12fe04b297362c00867ec66 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Thu, 26 Jan 2023 02:51:27 +0100 Subject: [PATCH 08/14] add params to CLI and internal API and deprecate conf --- airflow/api/client/api_client.py | 5 ++++- airflow/api/client/json_client.py | 6 +++++- airflow/api/client/local_client.py | 5 ++++- airflow/api/common/trigger_dag.py | 8 ++++++++ airflow/cli/cli_parser.py | 14 +++++++++++++- airflow/cli/commands/dag_command.py | 11 +++++++++-- 6 files changed, 43 insertions(+), 6 deletions(-) diff --git a/airflow/api/client/api_client.py b/airflow/api/client/api_client.py index 222c0f142d263..dcec5fb23c408 100644 --- a/airflow/api/client/api_client.py +++ b/airflow/api/client/api_client.py @@ -30,12 +30,15 @@ def __init__(self, api_base_url, auth=None, session: httpx.Client | None = None) if auth: self._session.auth = auth - def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True): + def trigger_dag( + self, dag_id, run_id=None, conf=None, params=None, execution_date=None, replace_microseconds=True + ): """Create a dag run for the specified dag. :param dag_id: :param run_id: :param conf: + :param: params: :param execution_date: :param replace_microseconds: :return: diff --git a/airflow/api/client/json_client.py b/airflow/api/client/json_client.py index 2eb91b3a422f7..f0fd634698f03 100644 --- a/airflow/api/client/json_client.py +++ b/airflow/api/client/json_client.py @@ -54,12 +54,15 @@ def _request(self, url: str, json=None, method: str = "GET") -> dict: raise OSError(data.get("error", "Server error")) return resp.json() - def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True): + def trigger_dag( + self, dag_id, run_id=None, conf=None, params=None, execution_date=None, replace_microseconds=True + ): """Trigger a DAG run. :param dag_id: The ID of the DAG to trigger. :param run_id: The ID of the DAG run to create. If not provided, a default ID will be generated. :param conf: A dictionary containing configuration data to pass to the DAG run. + :param params: A dictionary containing configuration data to pass to the DAG params. :param execution_date: The execution date for the DAG run, in the format "YYYY-MM-DDTHH:MM:SS". :param replace_microseconds: Whether to replace microseconds in the execution date with zeros. :return: A message indicating the status of the DAG run trigger. @@ -69,6 +72,7 @@ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, repla data = { "run_id": run_id, "conf": conf, + "params": params, "execution_date": execution_date, "replace_microseconds": replace_microseconds, } diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py index 2c8f471b396ae..3281a603fc116 100644 --- a/airflow/api/client/local_client.py +++ b/airflow/api/client/local_client.py @@ -28,11 +28,14 @@ class Client(api_client.Client): """Local API client implementation.""" - def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True): + def trigger_dag( + self, dag_id, run_id=None, conf=None, params=None, execution_date=None, replace_microseconds=True + ): dag_run = trigger_dag.trigger_dag( dag_id=dag_id, run_id=run_id, conf=conf, + params=params, execution_date=execution_date, replace_microseconds=replace_microseconds, ) diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py index 01da7745c7bf3..4931cf40abbfc 100644 --- a/airflow/api/common/trigger_dag.py +++ b/airflow/api/common/trigger_dag.py @@ -33,6 +33,7 @@ def _trigger_dag( dag_bag: DagBag, run_id: str | None = None, conf: dict | str | None = None, + params: dict | str | None = None, execution_date: datetime | None = None, replace_microseconds: bool = True, ) -> list[DagRun | None]: @@ -42,6 +43,7 @@ def _trigger_dag( :param dag_bag: DAG Bag model :param run_id: ID of the dag_run :param conf: configuration + :param params: dagrun params :param execution_date: date of execution :param replace_microseconds: whether microseconds should be zeroed :return: list of triggered dags @@ -80,6 +82,8 @@ def _trigger_dag( run_conf = None if conf: run_conf = conf if isinstance(conf, dict) else json.loads(conf) + if params: + params = params if isinstance(params, dict) else json.loads(params) dag_runs = [] dags_to_run = [dag] + dag.subdags @@ -89,6 +93,7 @@ def _trigger_dag( execution_date=execution_date, state=DagRunState.QUEUED, conf=run_conf, + params=params, external_trigger=True, dag_hash=dag_bag.dags_hash.get(dag_id), data_interval=data_interval, @@ -102,6 +107,7 @@ def trigger_dag( dag_id: str, run_id: str | None = None, conf: dict | str | None = None, + params: dict | str | None = None, execution_date: datetime | None = None, replace_microseconds: bool = True, ) -> DagRun | None: @@ -110,6 +116,7 @@ def trigger_dag( :param dag_id: DAG ID :param run_id: ID of the dag_run :param conf: configuration + :param params: dagrun params :param execution_date: date of execution :param replace_microseconds: whether microseconds should be zeroed :return: first dag run triggered - even if more than one Dag Runs were triggered or None @@ -124,6 +131,7 @@ def trigger_dag( dag_bag=dagbag, run_id=run_id, conf=conf, + params=params, execution_date=execution_date, replace_microseconds=replace_microseconds, ) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 6890368f9bcb3..65ffc3055f52b 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -432,6 +432,7 @@ def string_lower_type(val): # trigger_dag ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run") ARG_CONF = Arg(("-c", "--conf"), help="JSON string that gets pickled into the DagRun's conf attribute") +ARG_PARAMS = Arg(("-p", "--params"), help="JSON string that gets pickled into the DagRun's params attribute") ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate) ARG_REPLACE_MICRO = Arg( ("--no-replace-microseconds",), @@ -1154,7 +1155,16 @@ class GroupCommand(NamedTuple): name="trigger", help="Trigger a DAG run", func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"), - args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE, ARG_VERBOSE, ARG_REPLACE_MICRO), + args=( + ARG_DAG_ID, + ARG_SUBDIR, + ARG_RUN_ID, + ARG_CONF, + ARG_PARAMS, + ARG_EXEC_DATE, + ARG_VERBOSE, + ARG_REPLACE_MICRO, + ), ), ActionCommand( name="delete", @@ -1248,6 +1258,7 @@ class GroupCommand(NamedTuple): ARG_DRY_RUN, ARG_VERBOSE, ARG_CONF, + ARG_PARAMS, ARG_RESET_DAG_RUN, ARG_RERUN_FAILED_TASKS, ARG_RUN_BACKWARDS, @@ -1281,6 +1292,7 @@ class GroupCommand(NamedTuple): ARG_DAG_ID, ARG_EXECUTION_DATE_OPTIONAL, ARG_CONF, + ARG_PARAMS, ARG_SUBDIR, ARG_SHOW_DAGRUN, ARG_IMGCAT_DAGRUN, diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 2df0a162c4a18..ed7f16ce4adf2 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -81,7 +81,9 @@ def dag_backfill(args, dag=None): run_conf = None if args.conf: run_conf = json.loads(args.conf) - + params = None + if args.params: + params = json.loads(args.params) for dag in dags: if args.task_regex: dag = dag.partial_subset( @@ -124,6 +126,7 @@ def dag_backfill(args, dag=None): delay_on_limit_secs=args.delay_on_limit, verbose=args.verbose, conf=run_conf, + params=params, rerun_failed_tasks=args.rerun_failed_tasks, run_backwards=args.run_backwards, continue_on_failures=args.continue_on_failures, @@ -146,6 +149,7 @@ def dag_trigger(args): dag_id=args.dag_id, run_id=args.run_id, conf=args.conf, + params=args.params, execution_date=args.exec_date, replace_microseconds=args.replace_microseconds, ) @@ -459,9 +463,12 @@ def dag_test(args, dag=None, session=None): run_conf = json.loads(args.conf) except ValueError as e: raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}") + params = None + if args.params: + params = json.loads(args.params) execution_date = args.execution_date or timezone.utcnow() dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id) - dag.test(execution_date=execution_date, run_conf=run_conf, session=session) + dag.test(execution_date=execution_date, run_conf=run_conf, params=params, session=session) show_dagrun = args.show_dagrun imgcat = args.imgcat_dagrun filename = args.save_dagrun From fedb90af9c51b4e6ef1a1ea1e0462c9328b8da89 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 29 Jan 2023 23:38:13 +0100 Subject: [PATCH 09/14] add migration script which add the column params to DagRun --- .../0124_2_6_0_add_params_column_to_dagrun.py | 48 + docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 1765 +++++++++-------- docs/apache-airflow/migrations-ref.rst | 4 +- 4 files changed, 936 insertions(+), 883 deletions(-) create mode 100644 airflow/migrations/versions/0124_2_6_0_add_params_column_to_dagrun.py diff --git a/airflow/migrations/versions/0124_2_6_0_add_params_column_to_dagrun.py b/airflow/migrations/versions/0124_2_6_0_add_params_column_to_dagrun.py new file mode 100644 index 0000000000000..d5e34e7c25b2a --- /dev/null +++ b/airflow/migrations/versions/0124_2_6_0_add_params_column_to_dagrun.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add params column to DagRun + +Revision ID: e8a79aa51603 +Revises: 6abdffdd4815 +Create Date: 2023-01-29 23:29:47.426929 + +""" +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "e8a79aa51603" +down_revision = "6abdffdd4815" +branch_labels = None +depends_on = None +airflow_version = "2.6.0" + + +def upgrade(): + """Apply adding params column to DagRun""" + with op.batch_alter_table("dag_run") as batch_op: + batch_op.add_column(sa.Column("params", sa.PickleType(), nullable=True)) + + +def downgrade(): + """Revert adding params column to DagRun""" + with op.batch_alter_table("dag_run") as batch_op: + batch_op.drop_column("params") diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 12315d49e1b4e..6b8cb62376a5c 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -d1cf9a4117e6dac254dafd45128cc25ef0849a7464a4d36a52539955e1f78bc5 \ No newline at end of file +f738043bfd2535a3964f0149cfe54304e9be21ca16cab239996cda0fdb1f93e6 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 61ed186741666..1d47df1302b94 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,275 +4,275 @@ - - + + %3 - + ab_permission - -ab_permission - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(100)] + NOT NULL ab_permission_view - -ab_permission_view - -id - [INTEGER] - NOT NULL - -permission_id - [INTEGER] - -view_menu_id - [INTEGER] + +ab_permission_view + +id + [INTEGER] + NOT NULL + +permission_id + [INTEGER] + +view_menu_id + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_permission_view_role - -ab_permission_view_role - -id - [INTEGER] - NOT NULL - -permission_view_id - [INTEGER] - -role_id - [INTEGER] + +ab_permission_view_role + +id + [INTEGER] + NOT NULL + +permission_view_id + [INTEGER] + +role_id + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_view_menu - -ab_view_menu - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_role - -ab_role - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(64)] - NOT NULL + +ab_role + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(64)] + NOT NULL ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_user_role - -ab_user_role - -id - [INTEGER] - NOT NULL - -role_id - [INTEGER] - -user_id - [INTEGER] + +ab_user_role + +id + [INTEGER] + NOT NULL + +role_id + [INTEGER] + +user_id + [INTEGER] ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_register_user - -ab_register_user - -id - [INTEGER] - NOT NULL - -email - [VARCHAR(256)] - NOT NULL - -first_name - [VARCHAR(64)] - NOT NULL - -last_name - [VARCHAR(64)] - NOT NULL - -password - [VARCHAR(256)] - -registration_date - [DATETIME] - -registration_hash - [VARCHAR(256)] - -username - [VARCHAR(256)] - NOT NULL + +ab_register_user + +id + [INTEGER] + NOT NULL + +email + [VARCHAR(256)] + NOT NULL + +first_name + [VARCHAR(64)] + NOT NULL + +last_name + [VARCHAR(64)] + NOT NULL + +password + [VARCHAR(256)] + +registration_date + [DATETIME] + +registration_hash + [VARCHAR(256)] + +username + [VARCHAR(256)] + NOT NULL ab_user - -ab_user - -id - [INTEGER] - NOT NULL - -active - [BOOLEAN] - -changed_by_fk - [INTEGER] - -changed_on - [DATETIME] - -created_by_fk - [INTEGER] - -created_on - [DATETIME] - -email - [VARCHAR(256)] - NOT NULL - -fail_login_count - [INTEGER] - -first_name - [VARCHAR(64)] - NOT NULL - -last_login - [DATETIME] - -last_name - [VARCHAR(64)] - NOT NULL - -login_count - [INTEGER] - -password - [VARCHAR(256)] - -username - [VARCHAR(256)] - NOT NULL + +ab_user + +id + [INTEGER] + NOT NULL + +active + [BOOLEAN] + +changed_by_fk + [INTEGER] + +changed_on + [DATETIME] + +created_by_fk + [INTEGER] + +created_on + [DATETIME] + +email + [VARCHAR(256)] + NOT NULL + +fail_login_count + [INTEGER] + +first_name + [VARCHAR(64)] + NOT NULL + +last_login + [DATETIME] + +last_name + [VARCHAR(64)] + NOT NULL + +login_count + [INTEGER] + +password + [VARCHAR(256)] + +username + [VARCHAR(256)] + NOT NULL ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note + +dag_run_note + +dag_run_id + [INTEGER] + NOT NULL + +content + [VARCHAR(1000)] -dag_run_id - [INTEGER] - NOT NULL +created_at + [TIMESTAMP] + NOT NULL -content - [VARCHAR(1000)] +updated_at + [TIMESTAMP] + NOT NULL -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL - -user_id - [INTEGER] +user_id + [INTEGER] ab_user--dag_run_note - -0..N -{0,1} + +0..N +{0,1} @@ -313,473 +313,476 @@ ab_user--task_instance_note - -0..N -{0,1} + +0..N +{0,1} alembic_version - -alembic_version - -version_num - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + [VARCHAR(32)] + NOT NULL callback_request - -callback_request - -id - [INTEGER] - NOT NULL - -callback_data - [JSON] - NOT NULL - -callback_type - [VARCHAR(20)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -priority_weight - [INTEGER] - NOT NULL - -processor_subdir - [VARCHAR(2000)] + +callback_request + +id + [INTEGER] + NOT NULL + +callback_data + [JSON] + NOT NULL + +callback_type + [VARCHAR(20)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +priority_weight + [INTEGER] + NOT NULL + +processor_subdir + [VARCHAR(2000)] connection - -connection - -id - [INTEGER] - NOT NULL - -conn_id - [VARCHAR(250)] - NOT NULL - -conn_type - [VARCHAR(500)] - NOT NULL - -description - [VARCHAR(5000)] - -extra - [TEXT] - -host - [VARCHAR(500)] - -is_encrypted - [BOOLEAN] - -is_extra_encrypted - [BOOLEAN] - -login - [VARCHAR(500)] - -password - [VARCHAR(5000)] - -port - [INTEGER] - -schema - [VARCHAR(500)] + +connection + +id + [INTEGER] + NOT NULL + +conn_id + [VARCHAR(250)] + NOT NULL + +conn_type + [VARCHAR(500)] + NOT NULL + +description + [VARCHAR(5000)] + +extra + [TEXT] + +host + [VARCHAR(500)] + +is_encrypted + [BOOLEAN] + +is_extra_encrypted + [BOOLEAN] + +login + [VARCHAR(500)] + +password + [VARCHAR(5000)] + +port + [INTEGER] + +schema + [VARCHAR(500)] dag - -dag - -dag_id - [VARCHAR(250)] - NOT NULL - -default_view - [VARCHAR(25)] - -description - [TEXT] - -fileloc - [VARCHAR(2000)] - -has_import_errors - [BOOLEAN] - -has_task_concurrency_limits - [BOOLEAN] - NOT NULL - -is_active - [BOOLEAN] - -is_paused - [BOOLEAN] - -is_subdag - [BOOLEAN] - -last_expired - [TIMESTAMP] - -last_parsed_time - [TIMESTAMP] - -last_pickled - [TIMESTAMP] - -max_active_runs - [INTEGER] - -max_active_tasks - [INTEGER] - NOT NULL - -next_dagrun - [TIMESTAMP] - -next_dagrun_create_after - [TIMESTAMP] - -next_dagrun_data_interval_end - [TIMESTAMP] - -next_dagrun_data_interval_start - [TIMESTAMP] - -owners - [VARCHAR(2000)] - -pickle_id - [INTEGER] - -processor_subdir - [VARCHAR(2000)] - -root_dag_id - [VARCHAR(250)] - -schedule_interval - [TEXT] - -scheduler_lock - [BOOLEAN] - -timetable_description - [VARCHAR(1000)] + +dag + +dag_id + [VARCHAR(250)] + NOT NULL + +default_view + [VARCHAR(25)] + +description + [TEXT] + +fileloc + [VARCHAR(2000)] + +has_import_errors + [BOOLEAN] + +has_task_concurrency_limits + [BOOLEAN] + NOT NULL + +is_active + [BOOLEAN] + +is_paused + [BOOLEAN] + +is_subdag + [BOOLEAN] + +last_expired + [TIMESTAMP] + +last_parsed_time + [TIMESTAMP] + +last_pickled + [TIMESTAMP] + +max_active_runs + [INTEGER] + +max_active_tasks + [INTEGER] + NOT NULL + +next_dagrun + [TIMESTAMP] + +next_dagrun_create_after + [TIMESTAMP] + +next_dagrun_data_interval_end + [TIMESTAMP] + +next_dagrun_data_interval_start + [TIMESTAMP] + +owners + [VARCHAR(2000)] + +pickle_id + [INTEGER] + +processor_subdir + [VARCHAR(2000)] + +root_dag_id + [VARCHAR(250)] + +schedule_interval + [TEXT] + +scheduler_lock + [BOOLEAN] + +timetable_description + [VARCHAR(1000)] dag_owner_attributes - -dag_owner_attributes - -dag_id - [VARCHAR(250)] - NOT NULL - -owner - [VARCHAR(500)] - NOT NULL - -link - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + [VARCHAR(250)] + NOT NULL + +owner + [VARCHAR(500)] + NOT NULL + +link + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -{0,1} + +0..N +{0,1} dag_schedule_dataset_reference - -dag_schedule_dataset_reference - -dag_id - [VARCHAR(250)] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL + +dag_schedule_dataset_reference + +dag_id + [VARCHAR(250)] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL dag--dag_schedule_dataset_reference - -0..N -{0,1} + +0..N +{0,1} dag_tag - -dag_tag - -dag_id - [VARCHAR(250)] - NOT NULL - -name - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + [VARCHAR(250)] + NOT NULL + +name + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -{0,1} + +0..N +{0,1} dag_warning - -dag_warning - -dag_id - [VARCHAR(250)] - NOT NULL - -warning_type - [VARCHAR(50)] - NOT NULL - -message - [TEXT] - NOT NULL - -timestamp - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + [VARCHAR(250)] + NOT NULL + +warning_type + [VARCHAR(50)] + NOT NULL + +message + [TEXT] + NOT NULL + +timestamp + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -{0,1} + +0..N +{0,1} dataset_dag_run_queue - -dataset_dag_run_queue - -dataset_id - [INTEGER] - NOT NULL - -target_dag_id - [VARCHAR(250)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL + +dataset_dag_run_queue + +dataset_id + [INTEGER] + NOT NULL + +target_dag_id + [VARCHAR(250)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL dag--dataset_dag_run_queue - -0..N -{0,1} + +0..N +{0,1} task_outlet_dataset_reference - -task_outlet_dataset_reference - -dag_id - [VARCHAR(250)] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL + +task_outlet_dataset_reference + +dag_id + [VARCHAR(250)] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL dag--task_outlet_dataset_reference - -0..N -{0,1} + +0..N +{0,1} dag_code - -dag_code - -fileloc_hash - [BIGINT] - NOT NULL - -fileloc - [VARCHAR(2000)] - NOT NULL - -last_updated - [TIMESTAMP] - NOT NULL - -source_code - [TEXT] - NOT NULL + +dag_code + +fileloc_hash + [BIGINT] + NOT NULL + +fileloc + [VARCHAR(2000)] + NOT NULL + +last_updated + [TIMESTAMP] + NOT NULL + +source_code + [TEXT] + NOT NULL dag_pickle - -dag_pickle - -id - [INTEGER] - NOT NULL - -created_dttm - [TIMESTAMP] - -pickle - [BLOB] - -pickle_hash - [BIGINT] + +dag_pickle + +id + [INTEGER] + NOT NULL + +created_dttm + [TIMESTAMP] + +pickle + [BLOB] + +pickle_hash + [BIGINT] dag_run - -dag_run - -id - [INTEGER] - NOT NULL - -conf - [BLOB] - -creating_job_id - [INTEGER] - -dag_hash - [VARCHAR(32)] - -dag_id - [VARCHAR(250)] - NOT NULL - -data_interval_end - [TIMESTAMP] - -data_interval_start - [TIMESTAMP] - -end_date - [TIMESTAMP] - -execution_date - [TIMESTAMP] - NOT NULL - -external_trigger - [BOOLEAN] - -last_scheduling_decision - [TIMESTAMP] - -log_template_id - [INTEGER] - -queued_at - [TIMESTAMP] - -run_id - [VARCHAR(250)] - NOT NULL - -run_type - [VARCHAR(50)] - NOT NULL - -start_date - [TIMESTAMP] - -state - [VARCHAR(50)] - -updated_at - [TIMESTAMP] + +dag_run + +id + [INTEGER] + NOT NULL + +conf + [BLOB] + +creating_job_id + [INTEGER] + +dag_hash + [VARCHAR(32)] + +dag_id + [VARCHAR(250)] + NOT NULL + +data_interval_end + [TIMESTAMP] + +data_interval_start + [TIMESTAMP] + +end_date + [TIMESTAMP] + +execution_date + [TIMESTAMP] + NOT NULL + +external_trigger + [BOOLEAN] + +last_scheduling_decision + [TIMESTAMP] + +log_template_id + [INTEGER] + +params + [BLOB] + +queued_at + [TIMESTAMP] + +run_id + [VARCHAR(250)] + NOT NULL + +run_type + [VARCHAR(50)] + NOT NULL + +start_date + [TIMESTAMP] + +state + [VARCHAR(50)] + +updated_at + [TIMESTAMP] dag_run--dag_run_note - -0..N -{0,1} + +0..N +{0,1} dagrun_dataset_event - -dagrun_dataset_event - -dag_run_id - [INTEGER] - NOT NULL - -event_id - [INTEGER] - NOT NULL + +dagrun_dataset_event + +dag_run_id + [INTEGER] + NOT NULL + +event_id + [INTEGER] + NOT NULL dag_run--dagrun_dataset_event - -0..N -{0,1} + +0..N +{0,1} @@ -880,16 +883,16 @@ dag_run--task_instance - -0..N -{0,1} + +0..N +{0,1} dag_run--task_instance - -0..N -{0,1} + +0..N +{0,1} @@ -940,16 +943,16 @@ dag_run--task_reschedule - -0..N -{0,1} + +0..N +{0,1} dag_run--task_reschedule - -0..N -{0,1} + +0..N +{0,1} @@ -1252,370 +1255,370 @@ log_template - -log_template - -id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -elasticsearch_id - [TEXT] - NOT NULL - -filename - [TEXT] - NOT NULL + +log_template + +id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +elasticsearch_id + [TEXT] + NOT NULL + +filename + [TEXT] + NOT NULL log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} dataset - -dataset - -id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -extra - [JSON] - NOT NULL - -is_orphaned - [BOOLEAN] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL - -uri - [VARCHAR(3000)] - NOT NULL + +dataset + +id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +extra + [JSON] + NOT NULL + +is_orphaned + [BOOLEAN] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +uri + [VARCHAR(3000)] + NOT NULL dataset--dag_schedule_dataset_reference - -0..N -{0,1} + +0..N +{0,1} dataset--dataset_dag_run_queue - -0..N -{0,1} + +0..N +{0,1} dataset--task_outlet_dataset_reference - -0..N -{0,1} + +0..N +{0,1} dataset_event - -dataset_event - -id - [INTEGER] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -extra - [JSON] - NOT NULL - -source_dag_id - [VARCHAR(250)] - -source_map_index - [INTEGER] - -source_run_id - [VARCHAR(250)] - -source_task_id - [VARCHAR(250)] - -timestamp - [TIMESTAMP] - NOT NULL + +dataset_event + +id + [INTEGER] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +extra + [JSON] + NOT NULL + +source_dag_id + [VARCHAR(250)] + +source_map_index + [INTEGER] + +source_run_id + [VARCHAR(250)] + +source_task_id + [VARCHAR(250)] + +timestamp + [TIMESTAMP] + NOT NULL dataset_event--dagrun_dataset_event - -0..N -{0,1} + +0..N +{0,1} import_error - -import_error - -id - [INTEGER] - NOT NULL - -filename - [VARCHAR(1024)] - -stacktrace - [TEXT] - -timestamp - [TIMESTAMP] + +import_error + +id + [INTEGER] + NOT NULL + +filename + [VARCHAR(1024)] + +stacktrace + [TEXT] + +timestamp + [TIMESTAMP] job - -job - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - -end_date - [TIMESTAMP] - -executor_class - [VARCHAR(500)] - -hostname - [VARCHAR(500)] - -job_type - [VARCHAR(30)] - -latest_heartbeat - [TIMESTAMP] - -start_date - [TIMESTAMP] - -state - [VARCHAR(20)] - -unixname - [VARCHAR(1000)] + +job + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + +end_date + [TIMESTAMP] + +executor_class + [VARCHAR(500)] + +hostname + [VARCHAR(500)] + +job_type + [VARCHAR(30)] + +latest_heartbeat + [TIMESTAMP] + +start_date + [TIMESTAMP] + +state + [VARCHAR(20)] + +unixname + [VARCHAR(1000)] log - -log - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - -dttm - [TIMESTAMP] - -event - [VARCHAR(30)] - -execution_date - [TIMESTAMP] - -extra - [TEXT] - -map_index - [INTEGER] - -owner - [VARCHAR(500)] - -task_id - [VARCHAR(250)] + +log + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + +dttm + [TIMESTAMP] + +event + [VARCHAR(30)] + +execution_date + [TIMESTAMP] + +extra + [TEXT] + +map_index + [INTEGER] + +owner + [VARCHAR(500)] + +task_id + [VARCHAR(250)] trigger - -trigger - -id - [INTEGER] - NOT NULL - -classpath - [VARCHAR(1000)] - NOT NULL - -created_date - [TIMESTAMP] - NOT NULL - -kwargs - [JSON] - NOT NULL - -triggerer_id - [INTEGER] + +trigger + +id + [INTEGER] + NOT NULL + +classpath + [VARCHAR(1000)] + NOT NULL + +created_date + [TIMESTAMP] + NOT NULL + +kwargs + [JSON] + NOT NULL + +triggerer_id + [INTEGER] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} serialized_dag - -serialized_dag - -dag_id - [VARCHAR(250)] - NOT NULL - -dag_hash - [VARCHAR(32)] - NOT NULL - -data - [JSON] - -data_compressed - [BLOB] - -fileloc - [VARCHAR(2000)] - NOT NULL - -fileloc_hash - [BIGINT] - NOT NULL - -last_updated - [TIMESTAMP] - NOT NULL - -processor_subdir - [VARCHAR(2000)] + +serialized_dag + +dag_id + [VARCHAR(250)] + NOT NULL + +dag_hash + [VARCHAR(32)] + NOT NULL + +data + [JSON] + +data_compressed + [BLOB] + +fileloc + [VARCHAR(2000)] + NOT NULL + +fileloc_hash + [BIGINT] + NOT NULL + +last_updated + [TIMESTAMP] + NOT NULL + +processor_subdir + [VARCHAR(2000)] session - -session - -id - [INTEGER] - NOT NULL - -data - [BLOB] - -expiry - [DATETIME] - -session_id - [VARCHAR(255)] + +session + +id + [INTEGER] + NOT NULL + +data + [BLOB] + +expiry + [DATETIME] + +session_id + [VARCHAR(255)] sla_miss - -sla_miss - -dag_id - [VARCHAR(250)] - NOT NULL - -execution_date - [TIMESTAMP] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -description - [TEXT] - -email_sent - [BOOLEAN] - -notification_sent - [BOOLEAN] - -timestamp - [TIMESTAMP] + +sla_miss + +dag_id + [VARCHAR(250)] + NOT NULL + +execution_date + [TIMESTAMP] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +description + [TEXT] + +email_sent + [BOOLEAN] + +notification_sent + [BOOLEAN] + +timestamp + [TIMESTAMP] slot_pool - -slot_pool - -id - [INTEGER] - NOT NULL - -description - [TEXT] - -pool - [VARCHAR(256)] - -slots - [INTEGER] + +slot_pool + +id + [INTEGER] + NOT NULL + +description + [TEXT] + +pool + [VARCHAR(256)] + +slots + [INTEGER] variable - -variable - -id - [INTEGER] - NOT NULL - -description - [TEXT] - -is_encrypted - [BOOLEAN] - -key - [VARCHAR(250)] - -val - [TEXT] + +variable + +id + [INTEGER] + NOT NULL + +description + [TEXT] + +is_encrypted + [BOOLEAN] + +key + [VARCHAR(250)] + +val + [TEXT] diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 0391380c08639..f5e58dc45ef73 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``6abdffdd4815`` (head) | ``290244fb8b83`` | ``2.6.0`` | add dttm index on log table | +| ``e8a79aa51603`` (head) | ``6abdffdd4815`` | ``2.6.0`` | Add params column to DagRun | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``6abdffdd4815`` | ``290244fb8b83`` | ``2.6.0`` | add dttm index on log table | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``290244fb8b83`` | ``1986afd32c1b`` | ``2.5.0`` | Add is_orphaned to DatasetModel | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ From 2beb9fe7a0e7c3d057ab1a53a8daffdfafcff95b Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Mon, 30 Jan 2023 00:47:04 +0100 Subject: [PATCH 10/14] fix API tests and add params tests --- tests/api/client/test_local_client.py | 19 ++++++ .../endpoints/test_dag_run_endpoint.py | 28 ++++++++- .../schemas/test_dag_run_schema.py | 63 ++++++++++++++++++- 3 files changed, 105 insertions(+), 5 deletions(-) diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py index ff2ea3b048a97..0ba01eeea6c4e 100644 --- a/tests/api/client/test_local_client.py +++ b/tests/api/client/test_local_client.py @@ -81,6 +81,7 @@ def test_trigger_dag(self, mock): execution_date=EXECDATE_NOFRACTIONS, state=DagRunState.QUEUED, conf=None, + params=None, external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, @@ -94,6 +95,7 @@ def test_trigger_dag(self, mock): execution_date=EXECDATE_NOFRACTIONS, state=DagRunState.QUEUED, conf=None, + params=None, external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, @@ -108,6 +110,7 @@ def test_trigger_dag(self, mock): execution_date=EXECDATE_NOFRACTIONS, state=DagRunState.QUEUED, conf=None, + params=None, external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, @@ -122,6 +125,22 @@ def test_trigger_dag(self, mock): execution_date=EXECDATE_NOFRACTIONS, state=DagRunState.QUEUED, conf=json.loads(conf), + params=None, + external_trigger=True, + dag_hash=expected_dag_hash, + data_interval=expected_data_interval, + ) + mock.reset_mock() + + # test params + params = '{"name": "John"}' + self.client.trigger_dag(dag_id=test_dag_id, params=params) + mock.assert_called_once_with( + run_id=run_id, + execution_date=EXECDATE_NOFRACTIONS, + state=DagRunState.QUEUED, + conf=None, + params=json.loads(params), external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 9e113f2b13b88..a6910a495c32b 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -236,6 +236,7 @@ def test_should_respond_200(self, session): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -294,6 +295,7 @@ def test_should_respond_200(self, session): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -310,6 +312,7 @@ def test_should_respond_200(self, session): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -366,6 +369,7 @@ def test_return_correct_results_with_order_by(self, session): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -382,6 +386,7 @@ def test_return_correct_results_with_order_by(self, session): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -634,6 +639,7 @@ def test_should_respond_200(self): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -650,6 +656,7 @@ def test_should_respond_200(self): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -693,6 +700,7 @@ def test_order_by_descending_works(self): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -709,6 +717,7 @@ def test_order_by_descending_works(self): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -750,6 +759,7 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -766,6 +776,7 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -1063,6 +1074,7 @@ def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_d expected_dag_run_id = dag_run_id assert response.json == { "conf": {}, + "params": {}, "dag_id": "TEST_DAG_ID", "dag_run_id": expected_dag_run_id, "end_date": None, @@ -1113,6 +1125,7 @@ def test_should_response_200_for_matching_execution_date_logical_date(self): assert response.status_code == 200 assert response.json == { "conf": {}, + "params": {}, "dag_id": "TEST_DAG_ID", "dag_run_id": dag_run_id, "end_date": None, @@ -1168,10 +1181,18 @@ def test_should_response_400_for_naive_datetime_and_bad_datetime(self, data, exp "conf": "some string", }, "'some string' is not of type 'object' - 'conf'", - ) + ), + ( + { + "dag_run_id": "TEST_DAG_RUN", + "execution_date": "2020-06-11T18:00:00+00:00", + "params": "some string", + }, + "'some string' is not of type 'object' - 'params'", + ), ], ) - def test_should_response_400_for_non_dict_dagrun_conf(self, data, expected): + def test_should_response_400_for_non_dict_dagrun_conf_or_params(self, data, expected): self._create_dag("TEST_DAG_ID") response = self.client.post( "api/v1/dags/TEST_DAG_ID/dagRuns", json=data, environ_overrides={"REMOTE_USER": "test"} @@ -1329,6 +1350,7 @@ def test_should_respond_200(self, state, run_type, dag_maker, session): assert response.status_code == 200 assert response.json == { "conf": {}, + "params": {}, "dag_id": dag_id, "dag_run_id": dag_run_id, "end_date": dr.end_date.isoformat(), @@ -1425,6 +1447,7 @@ def test_should_respond_200(self, dag_maker, session): assert response.status_code == 200 assert response.json == { "conf": {}, + "params": {}, "dag_id": dag_id, "dag_run_id": dag_run_id, "end_date": None, @@ -1627,6 +1650,7 @@ def test_should_respond_200(self, dag_maker, session): assert dr.note == new_note_value assert response.json == { "conf": {}, + "params": {}, "dag_id": dr.dag_id, "dag_run_id": dr.run_id, "end_date": dr.end_date.isoformat(), diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py index 74a119ca55cc2..668a7f1750c4f 100644 --- a/tests/api_connexion/schemas/test_dag_run_schema.py +++ b/tests/api_connexion/schemas/test_dag_run_schema.py @@ -35,12 +35,15 @@ SECOND_TIME = "2020-06-10T13:59:56.336000+00:00" +THIRD_TIME = "2020-06-11T13:59:56.336000+00:00" + class TestDAGRunBase: def setup_method(self) -> None: clear_db_runs() self.default_time = DEFAULT_TIME self.second_time = SECOND_TIME + self.third_time = THIRD_TIME def teardown_method(self) -> None: clear_db_runs() @@ -57,6 +60,7 @@ def test_serialize(self, session): execution_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), conf='{"start": "stop"}', + params='{"param_key": "param_value"}', ) session.add(dagrun_model) session.commit() @@ -73,6 +77,7 @@ def test_serialize(self, session): "external_trigger": True, "start_date": self.default_time, "conf": {"start": "stop"}, + "params": {"param_key": "param_value"}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -111,6 +116,30 @@ def test_serialize(self, session): "conf": {"start": "stop"}, }, ), + ( + { + "dag_run_id": "my-dag-run", + "execution_date": DEFAULT_TIME, + "params": {"start": "stop"}, + }, + { + "run_id": "my-dag-run", + "execution_date": parse(DEFAULT_TIME), + "params": {"start": "stop"}, + }, + ), + ( + { + "dag_run_id": "my-dag-run", + "execution_date": DEFAULT_TIME, + "params": '{"start": "stop"}', + }, + { + "run_id": "my-dag-run", + "execution_date": parse(DEFAULT_TIME), + "params": {"start": "stop"}, + }, + ), ], ) def test_deserialize(self, serialized_dagrun, expected_result): @@ -150,10 +179,19 @@ def test_serialize(self, session): start_date=timezone.parse(self.default_time), run_type=DagRunType.MANUAL.value, ) - dagruns = [dagrun_model_1, dagrun_model_2] + dagrun_model_3 = DagRun( + dag_id="my-dag-run", + run_id="my-dag-run-3", + state="running", + execution_date=timezone.parse(self.third_time), + start_date=timezone.parse(self.default_time), + run_type=DagRunType.MANUAL.value, + params='{"start": "stop"}', + ) + dagruns = [dagrun_model_1, dagrun_model_2, dagrun_model_3] session.add_all(dagruns) session.commit() - instance = DAGRunCollection(dag_runs=dagruns, total_entries=2) + instance = DAGRunCollection(dag_runs=dagruns, total_entries=3) deserialized_dagruns = dagrun_collection_schema.dump(instance) assert deserialized_dagruns == { "dag_runs": [ @@ -167,6 +205,7 @@ def test_serialize(self, session): "state": "running", "start_date": self.default_time, "conf": {"start": "stop"}, + "params": {}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -183,6 +222,24 @@ def test_serialize(self, session): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "params": {}, + "data_interval_end": None, + "data_interval_start": None, + "last_scheduling_decision": None, + "run_type": "manual", + "note": None, + }, + { + "dag_id": "my-dag-run", + "dag_run_id": "my-dag-run-3", + "end_date": None, + "state": "running", + "execution_date": self.third_time, + "logical_date": self.third_time, + "external_trigger": True, + "start_date": self.default_time, + "conf": {}, + "params": {"start": "stop"}, "data_interval_end": None, "data_interval_start": None, "last_scheduling_decision": None, @@ -190,5 +247,5 @@ def test_serialize(self, session): "note": None, }, ], - "total_entries": 2, + "total_entries": 3, } From c6c8faa20cd97f4eae30d3dd3aceb1584e953d0d Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 1 Feb 2023 02:22:01 +0100 Subject: [PATCH 11/14] fix WWW tests --- airflow/www/api/experimental/endpoints.py | 2 +- tests/www/api/experimental/test_endpoints.py | 6 +-- tests/www/views/test_views_grid.py | 8 +-- tests/www/views/test_views_trigger_dag.py | 53 ++++++++++---------- 4 files changed, 35 insertions(+), 34 deletions(-) diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index d23f294ce7c38..c745db5c6698e 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -122,7 +122,7 @@ def trigger_dag(dag_id): replace_microseconds = to_boolean(data["replace_microseconds"]) try: - dr = trigger.trigger_dag(dag_id, run_id, params, execution_date, replace_microseconds) + dr = trigger.trigger_dag(dag_id, run_id, None, params, execution_date, replace_microseconds) except AirflowException as err: log.error(err) response = jsonify(error=f"{err}") diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py index 86a145f5b81ae..46228f1f29a42 100644 --- a/tests/www/api/experimental/test_endpoints.py +++ b/tests/www/api/experimental/test_endpoints.py @@ -157,10 +157,10 @@ def test_trigger_dag(self): ) assert 404 == response.status_code - # Test error for bad conf data + # Test error for bad params data response = self.client.post( url_template.format("example_bash_operator"), - data=json.dumps({"conf": "This is a string not a dict"}), + data=json.dumps({"params": "This is a string not a dict"}), content_type="application/json", ) assert 400 == response.status_code @@ -168,7 +168,7 @@ def test_trigger_dag(self): # Test OK case response = self.client.post( url_template.format("example_bash_operator"), - data=json.dumps({"run_id": run_id, "conf": {"param": "value"}}), + data=json.dumps({"run_id": run_id}), content_type="application/json", ) self.assert_deprecated(response) diff --git a/tests/www/views/test_views_grid.py b/tests/www/views/test_views_grid.py index 79e9401f684f0..f238c752c949d 100644 --- a/tests/www/views/test_views_grid.py +++ b/tests/www/views/test_views_grid.py @@ -194,8 +194,8 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun], session): assert resp.json == { "dag_runs": [ { - "conf": None, - "conf_is_json": False, + "params": None, + "params_is_json": False, "data_interval_end": "2016-01-02T00:00:00+00:00", "data_interval_start": "2016-01-01T00:00:00+00:00", "end_date": timezone.utcnow().isoformat(), @@ -210,8 +210,8 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun], session): "state": "success", }, { - "conf": None, - "conf_is_json": False, + "params": None, + "params_is_json": False, "data_interval_end": "2016-01-03T00:00:00+00:00", "data_interval_start": "2016-01-02T00:00:00+00:00", "end_date": None, diff --git a/tests/www/views/test_views_trigger_dag.py b/tests/www/views/test_views_trigger_dag.py index 74d9c120d83d7..3c9e892c6e4fc 100644 --- a/tests/www/views/test_views_trigger_dag.py +++ b/tests/www/views/test_views_trigger_dag.py @@ -70,24 +70,27 @@ def test_duplicate_run_id(admin_client): check_content_in_response(f"The run ID {run_id} already exists", response) -def test_trigger_dag_conf(admin_client): - test_dag_id = "example_bash_operator" - conf_dict = {"string": "Hello, World!"} +def test_trigger_dag_prams(admin_client, dag_maker, monkeypatch, session): + test_dag_id = "params_dag" + params_dict = {"string": "Hello, World!"} - admin_client.post(f"trigger?dag_id={test_dag_id}", data={"conf": json.dumps(conf_dict)}) + param = Param("default", type="string") + with monkeypatch.context(): + with dag_maker(dag_id=test_dag_id, serialized=True, session=session, params={"string": param}): + EmptyOperator(task_id="task1") - with create_session() as session: - run = session.query(DagRun).filter(DagRun.dag_id == test_dag_id).first() + admin_client.post(f"trigger?dag_id={test_dag_id}", data={"params": json.dumps(params_dict)}) + run = session.query(DagRun).filter(DagRun.dag_id == test_dag_id).first() assert run is not None assert DagRunType.MANUAL in run.run_id assert run.run_type == DagRunType.MANUAL - assert run.conf == conf_dict + assert run.params == params_dict -def test_trigger_dag_conf_malformed(admin_client): +def test_trigger_dag_params_malformed(admin_client): test_dag_id = "example_bash_operator" - response = admin_client.post(f"trigger?dag_id={test_dag_id}", data={"conf": '{"a": "b"'}) + response = admin_client.post(f"trigger?dag_id={test_dag_id}", data={"params": '{"a": "b"'}) check_content_in_response("Invalid JSON configuration", response) with create_session() as session: @@ -95,10 +98,10 @@ def test_trigger_dag_conf_malformed(admin_client): assert run is None -def test_trigger_dag_conf_not_dict(admin_client): +def test_trigger_dag_params_not_dict(admin_client): test_dag_id = "example_bash_operator" - response = admin_client.post(f"trigger?dag_id={test_dag_id}", data={"conf": "string and not a dict"}) + response = admin_client.post(f"trigger?dag_id={test_dag_id}", data={"params": "string and not a dict"}) check_content_in_response("must be a dict", response) with create_session() as session: @@ -169,34 +172,32 @@ def test_trigger_dag_form_origin_url(admin_client, test_origin, expected_origin) @pytest.mark.parametrize( - "request_conf, expected_conf", + "request_params, expected_params", [ (None, {"example_key": "example_value"}), ({"other": "test_data", "key": 12}, {"other": "test_data", "key": 12}), ], ) -def test_trigger_dag_params_conf(admin_client, request_conf, expected_conf): +def test_trigger_dag_params(admin_client, request_params, expected_params): """ Test that textarea in Trigger DAG UI is pre-populated - with json config when the conf URL parameter is passed, - or if a params dict is passed in the DAG - - 1. Conf is not included in URL parameters -> DAG.conf is in textarea - 2. Conf is passed as a URL parameter -> passed conf json is in textarea + with json params when the prams URL parameter is passed + 1. Params is not included in URL parameters -> DAG.params is in textarea + 2. Params is passed as a URL parameter -> passed params json is in textarea """ test_dag_id = "example_bash_operator" doc_md = "Example Bash Operator" - if not request_conf: + if not request_params: resp = admin_client.get(f"trigger?dag_id={test_dag_id}") else: - test_request_conf = json.dumps(request_conf, indent=4) - resp = admin_client.get(f"trigger?dag_id={test_dag_id}&conf={test_request_conf}&doc_md={doc_md}") + test_request_params = json.dumps(request_params, indent=4) + resp = admin_client.get(f"trigger?dag_id={test_dag_id}¶ms={test_request_params}&doc_md={doc_md}") - expected_dag_conf = json.dumps(expected_conf, indent=4).replace('"', """) + expected_dag_params = json.dumps(expected_params, indent=4).replace('"', """) check_content_in_response( - f'', + f'', resp, ) @@ -207,8 +208,8 @@ def test_trigger_dag_params_render(admin_client, dag_maker, session, app, monkey with param value set in DAG. """ account = {"name": "account_name_1", "country": "usa"} - expected_conf = {"accounts": [account]} - expected_dag_conf = json.dumps(expected_conf, indent=4).replace('"', """) + expected_params = {"accounts": [account]} + expected_dag_params = json.dumps(expected_params, indent=4).replace('"', """) DAG_ID = "params_dag" param = Param( [account], @@ -231,7 +232,7 @@ def test_trigger_dag_params_render(admin_client, dag_maker, session, app, monkey resp = admin_client.get(f"trigger?dag_id={DAG_ID}") check_content_in_response( - f'', resp + f'', resp ) From f711ea7bc94bed84f349b69dbdbc0a3dfcac575e Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 4 Feb 2023 14:45:02 +0100 Subject: [PATCH 12/14] update core test and validate params + dagrun conf for bc --- airflow/models/dag.py | 6 ++++-- tests/models/test_dag.py | 9 ++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index e462298402e41..1ccc08ee0bd44 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2550,13 +2550,15 @@ def add_logger_if_needed(ti: TaskInstance): # Remove the local variables we have added to the secrets_backend_list secrets_backend_list.pop(0) - def _check_params(self, dag_run_params): + def _check_params(self, dag_run_params, dag_run_conf=None): """ Validates & raise exception if there are any extra provided Params not defined in the dag, missing Params which don't have a default value in the dag, or invalid params """ if dag_run_params is None: dag_run_params = {} + if conf.getboolean("core", "dag_run_conf_overrides_params") and dag_run_conf: + dag_run_params.update(**dag_run_conf) for k, param in self.params.items(): # As type can be an array, we would check if `null` is an allowed type or not if not param.has_value and ("type" not in param.schema or "null" not in param.schema["type"]): @@ -2668,7 +2670,7 @@ def create_dagrun( "dag_run conf is deprecated. Please use params instead", DeprecationWarning, stacklevel=2 ) - self._check_params(params) + self._check_params(params, conf) # create a copy of params before validating copied_params = copy.deepcopy(self.params) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index f1a043e1ea021..26edaecf3866a 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2174,7 +2174,10 @@ def test_replace_outdated_access_control_actions(self): def test_validate_params_on_trigger_dag(self): dag = models.DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")}) - with pytest.raises(ParamValidationError, match="No value passed and Param has no default value"): + with pytest.raises( + ParamValidationError, + match="You should provide a value for the required params without default value: param1", + ): dag.create_dagrun( run_id="test_dagrun_missing_param", state=State.RUNNING, @@ -2189,7 +2192,7 @@ def test_validate_params_on_trigger_dag(self): run_id="test_dagrun_missing_param", state=State.RUNNING, execution_date=TEST_DATE, - conf={"param1": None}, + params={"param1": None}, ) dag = models.DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")}) @@ -2197,7 +2200,7 @@ def test_validate_params_on_trigger_dag(self): run_id="test_dagrun_missing_param", state=State.RUNNING, execution_date=TEST_DATE, - conf={"param1": "hello"}, + params={"param1": "hello"}, ) def test_return_date_range_with_num_method(self): From fe1b4d9faa078385d37ba7f553ec7c4f444dc6ae Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 4 Feb 2023 15:42:52 +0100 Subject: [PATCH 13/14] update CLI tests --- tests/cli/commands/test_dag_command.py | 33 +++++++++++++++++--------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 419b9bdccc9e2..03fc4194a4a31 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -119,6 +119,7 @@ def test_backfill(self, mock_run): start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, conf=None, + params=None, delay_on_limit_secs=1.0, donot_pickle=False, ignore_first_depends_on_past=True, @@ -192,6 +193,7 @@ def test_backfill(self, mock_run): start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, conf=None, + params=None, delay_on_limit_secs=1.0, donot_pickle=False, ignore_first_depends_on_past=True, @@ -328,6 +330,7 @@ def test_cli_backfill_depends_on_past(self, mock_run): start_date=run_date, end_date=run_date, conf=None, + params=None, delay_on_limit_secs=1.0, donot_pickle=False, ignore_first_depends_on_past=True, @@ -369,6 +372,7 @@ def test_cli_backfill_depends_on_past_backwards(self, mock_run): start_date=start_date, end_date=end_date, conf=None, + params=None, delay_on_limit_secs=1.0, donot_pickle=False, ignore_first_depends_on_past=True, @@ -550,7 +554,7 @@ def test_trigger_dag(self): "example_bash_operator", "--run-id=test_trigger_dag", "--exec-date=2021-06-04T09:00:00+08:00", - '--conf={"foo": "bar"}', + '--params={"example_key": "some_value"}', ], ), ) @@ -560,7 +564,7 @@ def test_trigger_dag(self): assert dagrun, "DagRun not created" assert dagrun.run_type == DagRunType.MANUAL assert dagrun.external_trigger - assert dagrun.conf == {"foo": "bar"} + assert dagrun.params == {"example_key": "some_value"} # Coerced to UTC. assert dagrun.execution_date.isoformat(timespec="seconds") == "2021-06-04T01:00:00+00:00" @@ -592,7 +596,7 @@ def test_trigger_dag_with_microseconds(self): assert dagrun.external_trigger assert dagrun.execution_date.isoformat(timespec="microseconds") == "2021-06-04T01:00:00.000001+00:00" - def test_trigger_dag_invalid_conf(self): + def test_trigger_dag_invalid_params(self): with pytest.raises(ValueError): dag_command.dag_trigger( self.parser.parse_args( @@ -602,7 +606,7 @@ def test_trigger_dag_invalid_conf(self): "example_bash_operator", "--run-id", "trigger_dag_xxx", - "--conf", + "--params", "NOT JSON", ] ), @@ -654,7 +658,10 @@ def test_dag_test(self, mock_get_dag): [ mock.call(subdir=cli_args.subdir, dag_id="example_bash_operator"), mock.call().test( - execution_date=timezone.parse(DEFAULT_DATE.isoformat()), run_conf=None, session=mock.ANY + execution_date=timezone.parse(DEFAULT_DATE.isoformat()), + run_conf=None, + params=None, + session=mock.ANY, ), ] ) @@ -673,20 +680,20 @@ def test_dag_test_no_execution_date(self, mock_utcnow, mock_get_dag): mock_get_dag.assert_has_calls( [ mock.call(subdir=cli_args.subdir, dag_id="example_bash_operator"), - mock.call().test(execution_date=mock.ANY, run_conf=None, session=mock.ANY), + mock.call().test(execution_date=mock.ANY, run_conf=None, params=None, session=mock.ANY), ] ) @mock.patch("airflow.cli.commands.dag_command.get_dag") - def test_dag_test_conf(self, mock_get_dag): + def test_dag_test_params(self, mock_get_dag): cli_args = self.parser.parse_args( [ "dags", "test", "example_bash_operator", DEFAULT_DATE.isoformat(), - "-c", - '{"dag_run_conf_param": "param_value"}', + "-p", + '{"param": "param_value"}', ] ) dag_command.dag_test(cli_args) @@ -696,7 +703,8 @@ def test_dag_test_conf(self, mock_get_dag): mock.call(subdir=cli_args.subdir, dag_id="example_bash_operator"), mock.call().test( execution_date=timezone.parse(DEFAULT_DATE.isoformat()), - run_conf={"dag_run_conf_param": "param_value"}, + run_conf=None, + params={"param": "param_value"}, session=mock.ANY, ), ] @@ -717,7 +725,10 @@ def test_dag_test_show_dag(self, mock_get_dag, mock_render_dag): [ mock.call(subdir=cli_args.subdir, dag_id="example_bash_operator"), mock.call().test( - execution_date=timezone.parse(DEFAULT_DATE.isoformat()), run_conf=None, session=mock.ANY + execution_date=timezone.parse(DEFAULT_DATE.isoformat()), + run_conf=None, + params=None, + session=mock.ANY, ), ] ) From a93365616c7830ec4672476e7c6ee345ffd756f6 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 5 Feb 2023 02:03:41 +0100 Subject: [PATCH 14/14] some fixes on trigger UI and restore conf on some views --- airflow/models/dag.py | 2 +- airflow/www/forms.py | 3 + airflow/www/static/js/api/useGridData.test.ts | 2 + .../static/js/dag/grid/dagRuns/index.test.tsx | 6 ++ airflow/www/static/js/dag/grid/index.test.tsx | 2 + airflow/www/static/js/trigger.js | 24 +++--- airflow/www/static/js/types/index.ts | 4 + airflow/www/static/js/utils/index.test.ts | 2 + airflow/www/templates/airflow/trigger.html | 76 +++++----------- airflow/www/utils.py | 23 ++--- airflow/www/views.py | 86 ++++++++++++++----- tests/www/views/test_views_trigger_dag.py | 5 +- 12 files changed, 134 insertions(+), 101 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 1ccc08ee0bd44..9e8259cd377ab 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2558,7 +2558,7 @@ def _check_params(self, dag_run_params, dag_run_conf=None): if dag_run_params is None: dag_run_params = {} if conf.getboolean("core", "dag_run_conf_overrides_params") and dag_run_conf: - dag_run_params.update(**dag_run_conf) + dag_run_params.update(**{k: dag_run_conf[k] for k in self.params.keys()}) for k, param in self.params.items(): # As type can be an array, we would check if `null` is an allowed type or not if not param.has_value and ("type" not in param.schema or "null" not in param.schema["type"]): diff --git a/airflow/www/forms.py b/airflow/www/forms.py index c90da3706d277..fbf7e8908341c 100644 --- a/airflow/www/forms.py +++ b/airflow/www/forms.py @@ -137,6 +137,7 @@ class DagRunEditForm(DynamicForm): lazy_gettext("Logical Date"), widget=AirflowDateTimePickerROWidget(), ) + conf = TextAreaField(lazy_gettext("Conf"), widget=BS3TextAreaROWidget()) params = TextAreaField(lazy_gettext("Params"), widget=BS3TextAreaROWidget()) note = TextAreaField(lazy_gettext("User Note"), widget=BS3TextAreaFieldWidget()) @@ -144,6 +145,8 @@ def populate_obj(self, item): """Populates the attributes of the passed obj with data from the form's fields.""" super().populate_obj(item) item.run_type = DagRunType.from_run_id(item.run_id) + if item.conf: + item.conf = json.loads(item.conf) if item.params: item.params = json.loads(item.params) diff --git a/airflow/www/static/js/api/useGridData.test.ts b/airflow/www/static/js/api/useGridData.test.ts index 2fd78aa3ba268..b75a68b9a9e97 100644 --- a/airflow/www/static/js/api/useGridData.test.ts +++ b/airflow/www/static/js/api/useGridData.test.ts @@ -33,6 +33,8 @@ const commonDagRunParams = { endDate: null, lastSchedulingDecision: null, externalTrigger: false, + conf: null, + confIsJson: false, params: null, paramsIsJson: false, note: '', diff --git a/airflow/www/static/js/dag/grid/dagRuns/index.test.tsx b/airflow/www/static/js/dag/grid/dagRuns/index.test.tsx index 69b57787e8b69..1680ac1ddfcca 100644 --- a/airflow/www/static/js/dag/grid/dagRuns/index.test.tsx +++ b/airflow/www/static/js/dag/grid/dagRuns/index.test.tsx @@ -43,6 +43,8 @@ const generateRuns = (length: number): DagRun[] => ( lastSchedulingDecision: datestring, executionDate: datestring, externalTrigger: false, + conf: null, + confIsJson: false, params: null, paramsIsJson: false, note: 'someRandomValue', @@ -64,6 +66,8 @@ describe('Test DagRuns', () => { executionDate: '2021-11-08T21:14:19.704433+00:00', lastSchedulingDecision: datestring, externalTrigger: false, + conf: null, + confIsJson: false, params: null, paramsIsJson: false, note: 'someRandomValue', @@ -80,6 +84,8 @@ describe('Test DagRuns', () => { executionDate: '2021-11-08T21:14:19.704433+00:00', lastSchedulingDecision: datestring, externalTrigger: false, + conf: null, + confIsJson: false, params: null, paramsIsJson: false, note: 'someRandomValue', diff --git a/airflow/www/static/js/dag/grid/index.test.tsx b/airflow/www/static/js/dag/grid/index.test.tsx index 746c7123b7fd8..82c0ed55490a3 100644 --- a/airflow/www/static/js/dag/grid/index.test.tsx +++ b/airflow/www/static/js/dag/grid/index.test.tsx @@ -102,6 +102,8 @@ const mockGridData = { lastSchedulingDecision: '2021-11-08T21:14:19.704433+00:00', note: 'myCoolDagRun', externalTrigger: false, + conf: null, + confIsJson: false, params: null, paramsIsJson: false, }, diff --git a/airflow/www/static/js/trigger.js b/airflow/www/static/js/trigger.js index 9cff308a8e6f6..e12e27efc5994 100644 --- a/airflow/www/static/js/trigger.js +++ b/airflow/www/static/js/trigger.js @@ -24,9 +24,9 @@ const objectFields = new Map(); const recentConfigList = document.getElementById('recent_configs'); /** - * Update the generated JSON DagRun.conf JSON field if any field changed + * Update the hidden textarea DagRun.params field if any field changed */ -function updateJSONconf() { +function updateJSONparams() { const jsonStart = document.getElementById('json_start').value; const params = JSON.parse(jsonStart); const elements = document.getElementById('trigger_form'); @@ -104,12 +104,12 @@ function initForm() { gutters: ['CodeMirror-lint-markers'], lint: true, }); - field.on('blur', updateJSONconf); + field.on('blur', updateJSONparams); objectFields.set(elements[i].name, field); } else if (elements[i].type === 'checkbox') { - elements[i].addEventListener('change', updateJSONconf); + elements[i].addEventListener('change', updateJSONparams); } else { - elements[i].addEventListener('blur', updateJSONconf); + elements[i].addEventListener('blur', updateJSONparams); } } } @@ -131,7 +131,7 @@ function initForm() { // Validate JSON entry fields before submission elements.addEventListener('submit', (event) => { - updateJSONconf(); + updateJSONparams(); objectFields.forEach((cm) => { const textValue = cm.getValue(); try { @@ -147,18 +147,18 @@ function initForm() { }); }); - // Ensure layout is refreshed on generated JSON as well - document.getElementById('generated_json_toggle').addEventListener('click', () => { - setTimeout(jsonForm.refresh, 300); - }); + // // Ensure layout is refreshed on generated JSON as well + // document.getElementById('generated_json_toggle').addEventListener('click', () => { + // setTimeout(jsonForm.refresh, 300); + // }); // Update generated conf once - setTimeout(updateJSONconf, 100); + setTimeout(updateJSONparams, 100); } } initForm(); -window.updateJSONconf = updateJSONconf; +window.updateJSONparams = updateJSONparams; function setRecentConfig(e) { const dropdownValue = e.target.value; diff --git a/airflow/www/static/js/types/index.ts b/airflow/www/static/js/types/index.ts index afcfdd968d35d..84fd3ac490df1 100644 --- a/airflow/www/static/js/types/index.ts +++ b/airflow/www/static/js/types/index.ts @@ -54,6 +54,10 @@ interface DagRun { endDate: string | null; lastSchedulingDecision: string | null; externalTrigger: boolean; + + conf: string | null; + + confIsJson: boolean; params: string | null; paramsIsJson: boolean; note: string | null; diff --git a/airflow/www/static/js/utils/index.test.ts b/airflow/www/static/js/utils/index.test.ts index 6bd04aa9ffcc4..1cae1f1529f0d 100644 --- a/airflow/www/static/js/utils/index.test.ts +++ b/airflow/www/static/js/utils/index.test.ts @@ -130,6 +130,8 @@ describe('Test getDagRunLabel', () => { executionDate: '2021-12-09T21:14:19.704433+00:00', lastSchedulingDecision: '2021-11-08T21:14:19.704433+00:00', externalTrigger: false, + conf: null, + confIsJson: false, params: null, paramsIsJson: false, note: 'someRandomValue', diff --git a/airflow/www/templates/airflow/trigger.html b/airflow/www/templates/airflow/trigger.html index 407a036c49850..c5c61c491ff72 100644 --- a/airflow/www/templates/airflow/trigger.html +++ b/airflow/www/templates/airflow/trigger.html @@ -71,7 +71,7 @@
    {% elif "enum" in form_details.schema and form_details.schema.enum %} - {% if recent_params|length > 0 %} + {% if form_fields and recent_params|length > 0 %}
    - + {% for run_id, recent_param in recent_params.items() %} {% endfor %}
    + {% elif recent_confs|length > 0 %} +
    +
    + + +
    +
    {% endif %} {%- if form_fields %}
    @@ -148,7 +160,7 @@

    Trigger DAG: {{ dag_id }}

    DAG Parameters

    - - - - + - {%- else %} - - {%- endif %} -

    - To access parameters in your DAG use {{ '{{ params }}' }}. -

    +
    + + +