diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 46580d6206a90..81b5854e57c88 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -21,6 +21,7 @@ import json import time from collections.abc import Sequence +from json import JSONDecodeError from typing import TYPE_CHECKING, Any from sqlalchemy import select @@ -202,9 +203,11 @@ def execute(self, context: Context): parsed_logical_date = timezone.parse(self.logical_date) try: + if self.conf and isinstance(self.conf, str): + self.conf = json.loads(self.conf) json.dumps(self.conf) - except TypeError: - raise ValueError("conf parameter should be JSON Serializable") + except (TypeError, JSONDecodeError): + raise ValueError("conf parameter should be JSON Serializable %s", self.conf) if self.trigger_run_id: run_id = str(self.trigger_run_id) diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index 1d72fbb670e87..f016e7906b26c 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -200,7 +200,7 @@ def test_trigger_dagrun_operator_templated_invalid_conf(self, dag_maker): dag_maker.sync_dagbag_to_db() parse_and_sync_to_db(self.f_name) dr = dag_maker.create_dagrun() - with pytest.raises(ValueError, match="^conf parameter should be JSON Serializable$"): + with pytest.raises(ValueError, match="conf parameter should be JSON Serializable"): dag_maker.run_ti(task.task_id, dr) def test_trigger_dagrun_with_no_failed_state(self, dag_maker): @@ -267,6 +267,40 @@ def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self): fail_when_dag_is_paused=True, ) + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3") + def test_trigger_dagrun_with_str_conf(self): + """ + Test TriggerDagRunOperator conf is proper json string formatted + """ + with time_machine.travel("2025-02-18T08:04:46Z", tick=False): + task = TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id=TRIGGERED_DAG_ID, + conf='{"foo": "bar"}', + ) + + # Ensure correct exception is raised + with pytest.raises(DagRunTriggerException) as exc_info: + task.execute(context={}) + + assert exc_info.value.trigger_dag_id == TRIGGERED_DAG_ID + assert exc_info.value.conf == {"foo": "bar"} + + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3") + def test_trigger_dagrun_with_str_conf_error(self): + """ + Test TriggerDagRunOperator conf is not proper json string formatted + """ + with time_machine.travel("2025-02-18T08:04:46Z", tick=False): + task = TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id=TRIGGERED_DAG_ID, + conf="{'foo': 'bar', 'key': 123}", + ) + + with pytest.raises(ValueError, match="conf parameter should be JSON Serializable"): + task.execute(context={}) + # TODO: To be removed once the provider drops support for Airflow 2 @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")