Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow optional defaults in required fields with manual triggered dags #31301

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions airflow/example_dags/example_params_ui_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,10 @@
import datetime
import json
from pathlib import Path
from typing import TYPE_CHECKING

from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DAG
from airflow.models.param import Param

if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.models.param import Param, ParamsDict

with DAG(
dag_id=Path(__file__).stem,
Expand Down Expand Up @@ -170,9 +164,11 @@
),
# Fields can be required or not. If the defined fields are typed they are getting required by default
# (else they would not pass JSON schema validation) - to make typed fields optional you must
# permit the optional "null" type
# permit the optional "null" type.
# You can omit a default value if the DAG is triggered manually
"required_field": Param(
"You can not trigger if no text is given here!",
# In this example we have no default value
# Form will enforce a value supplied by users to be able to trigger
type="string",
title="Required text field",
description="This field is required. You can not submit without having text in here.",
Expand Down Expand Up @@ -303,13 +299,9 @@
},
) as dag:

@task(task_id="show_params")
@task
def show_params(**kwargs) -> None:
ti: TaskInstance = kwargs["ti"]
dag_run: DagRun = ti.dag_run
if not dag_run.conf:
print("Uups, no parameters supplied as DagRun.conf, was the trigger w/o form?")
raise AirflowSkipException("No DagRun.conf parameters supplied.")
print(f"This DAG was triggered with the following parameters:\n{json.dumps(dag_run.conf, indent=4)}")
params: ParamsDict = kwargs["params"]
print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")
jscheffl marked this conversation as resolved.
Show resolved Hide resolved

show_params()
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ def validate(self):
f"inconsistent schedule: timetable {self.timetable.summary!r} "
f"does not match schedule_interval {self.schedule_interval!r}",
)
self.params.validate()
self.validate_schedule_and_params()
self.timetable.validate()
self.validate_setup_teardown()

Expand Down
11 changes: 10 additions & 1 deletion docs/apache-airflow/core-concepts/params.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ JSON Schema Validation
},
):

.. note::
If ``schedule`` is defined for a DAG, params with defaults must be valid. This is validated during DAG parsing.
If ``schedule=None`` then params are not validated during DAG parsing but before triggering a DAG.
This is useful in cases where the DAG author does not want to provide defaults but wants to force users provide valid parameters
at time of trigger.

.. note::
As of now, for security reasons, one can not use :class:`~airflow.models.param.Param` objects derived out of custom classes. We are
planning to have a registration system for custom :class:`~airflow.models.param.Param` classes, just like we've for Operator ExtraLinks.
Expand Down Expand Up @@ -298,7 +304,6 @@ The following features are supported in the Trigger UI Form:
-
- ``Param(None, type=["null", "string"])``


- If a form field is left empty, it is passed as ``None`` value to the params dict.
- Form fields are rendered in the order of definition of ``params`` in the DAG.
- If you want to add sections to the Form, add the attribute ``section`` to each field. The text will be used as section label.
Expand All @@ -310,6 +315,10 @@ The following features are supported in the Trigger UI Form:
If you want to change values manually, the JSON configuration can be adjusted. Changes are overridden when form fields change.
- If you want to render custom HTML as form on top of the provided features, you can use the ``custom_html_form`` attribute.

.. note::
If the field is required the default value must be valid according to the schema as well. If the DAG is defined with
``schedule=None`` the parameter value validation is made at time of trigger.

For examples also please take a look to two example DAGs provided: ``example_params_trigger_ui`` and ``example_params_ui_tutorial``.

.. image:: ../img/trigger-dag-tutorial-form.png
Expand Down
2 changes: 1 addition & 1 deletion tests/dags/test_invalid_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
with DAG(
"test_invalid_param",
start_date=datetime(2021, 1, 1),
schedule="@once",
schedule="0 0 * * *",
params={
# a mandatory str param
"str_param": Param(type="string", minLength=2, maxLength=4),
Expand Down
45 changes: 45 additions & 0 deletions tests/dags/test_invalid_param2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator

with DAG(
"test_invalid_param2",
start_date=datetime(2021, 1, 1),
schedule="0 0 * * *",
params={
# a mandatory str param but pass None as value which is invalid
"str_param": Param(default=None, type="string", minLength=2, maxLength=4),
},
) as the_dag:

def print_these(*params):
for param in params:
print(param)

PythonOperator(
task_id="ref_params",
python_callable=print_these,
op_args=[
"{{ params.str_param }}",
],
)
45 changes: 45 additions & 0 deletions tests/dags/test_invalid_param3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator

with DAG(
"test_invalid_param3",
start_date=datetime(2021, 1, 1),
schedule="0 0 * * *",
params={
# a mandatory number param but pass a string as default value
"int_param": Param(default="banana", type="integer"),
},
) as the_dag:

def print_these(*params):
for param in params:
print(param)

PythonOperator(
task_id="ref_params",
python_callable=print_these,
op_args=[
"{{ params.int_param }}",
],
)
45 changes: 45 additions & 0 deletions tests/dags/test_invalid_param4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator

with DAG(
"test_invalid_param4",
start_date=datetime(2021, 1, 1),
schedule="0 0 * * *",
params={
# a mandatory string but the default is not valid in length validation
"str_param": Param(default="banana", type="string", minLength=2, maxLength=4),
},
) as the_dag:

def print_these(*params):
for param in params:
print(param)

PythonOperator(
task_id="ref_params",
python_callable=print_these,
op_args=[
"{{ params.str_param }}",
],
)
49 changes: 49 additions & 0 deletions tests/dags/test_valid_param.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator

with DAG(
"test_valid_param",
start_date=datetime(2021, 1, 1),
schedule=None,
params={
# a string default is not mandatory as DAG has no schedule
"str_param": Param(type="string", minLength=2, maxLength=4),
# a string with None as default is also accepted as no schedule
"str_param2": Param(None, type="string", minLength=2, maxLength=4),
# But of course adding a valid default is also fine
"str_param3": Param("valid_default", type="string", minLength=2, maxLength=15),
},
) as the_dag:

def print_these(*params):
for param in params:
print(param)

PythonOperator(
task_id="ref_params",
python_callable=print_these,
op_args=[
"{{ params.str_param }}",
],
)
47 changes: 47 additions & 0 deletions tests/dags/test_valid_param2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator

with DAG(
"test_valid_param2",
start_date=datetime(2021, 1, 1),
schedule="0 0 * * *",
params={
# mandatory string has default, this is how we want it!
"str_param": Param("some_default", type="string", minLength=2, maxLength=12),
# Field does not need to have a default if type is nullable
"optional_str_param": Param(None, type=["null", "string"]),
},
) as the_dag:

def print_these(*params):
for param in params:
print(param)

PythonOperator(
task_id="ref_params",
python_callable=print_these,
op_args=[
"{{ params.str_param }}",
],
)
3 changes: 3 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3110,6 +3110,9 @@ def test_list_py_file_paths(self):
"test_invalid_dup_task.py",
"test_ignore_this.py",
"test_invalid_param.py",
"test_invalid_param2.py",
"test_invalid_param3.py",
"test_invalid_param4.py",
"test_nested_dag.py",
"test_imports.py",
"__init__.py",
Expand Down
25 changes: 23 additions & 2 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,14 @@ def test_process_file_cron_validity_check(

def test_process_file_invalid_param_check(self, tmp_path):
"""
test if an invalid param in the dag param can be identified
test if an invalid param in the dags can be identified
"""
invalid_dag_files = ["test_invalid_param.py"]
invalid_dag_files = [
"test_invalid_param.py",
"test_invalid_param2.py",
"test_invalid_param3.py",
"test_invalid_param4.py",
]
dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)

assert len(dagbag.import_errors) == 0
Expand All @@ -295,6 +300,22 @@ def test_process_file_invalid_param_check(self, tmp_path):
assert len(dagbag.import_errors) == len(invalid_dag_files)
assert len(dagbag.dags) == 0

def test_process_file_valid_param_check(self, tmp_path):
"""
test if valid params in the dags param can be validated (positive test)
"""
valid_dag_files = [
"test_valid_param.py",
"test_valid_param2.py",
]
dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)

assert len(dagbag.import_errors) == 0
for file in valid_dag_files:
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, file))
assert len(dagbag.import_errors) == 0
assert len(dagbag.dags) == len(valid_dag_files)

@patch.object(DagModel, "get_current")
def test_get_dag_without_refresh(self, mock_dagmodel):
"""
Expand Down
Loading