Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import json
import time
from collections.abc import Sequence
from json import JSONDecodeError
from typing import TYPE_CHECKING, Any

from sqlalchemy import select
Expand Down Expand Up @@ -202,9 +203,11 @@ def execute(self, context: Context):
parsed_logical_date = timezone.parse(self.logical_date)

try:
if self.conf and isinstance(self.conf, str):
self.conf = json.loads(self.conf)
json.dumps(self.conf)
except TypeError:
raise ValueError("conf parameter should be JSON Serializable")
except (TypeError, JSONDecodeError):
raise ValueError("conf parameter should be JSON Serializable %s", self.conf)

if self.trigger_run_id:
run_id = str(self.trigger_run_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def test_trigger_dagrun_operator_templated_invalid_conf(self, dag_maker):
dag_maker.sync_dagbag_to_db()
parse_and_sync_to_db(self.f_name)
dr = dag_maker.create_dagrun()
with pytest.raises(ValueError, match="^conf parameter should be JSON Serializable$"):
with pytest.raises(ValueError, match="conf parameter should be JSON Serializable"):
dag_maker.run_ti(task.task_id, dr)

def test_trigger_dagrun_with_no_failed_state(self, dag_maker):
Expand Down Expand Up @@ -267,6 +267,40 @@ def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self):
fail_when_dag_is_paused=True,
)

@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3")
def test_trigger_dagrun_with_str_conf(self):
"""
Test TriggerDagRunOperator conf is proper json string formatted
"""
with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
conf='{"foo": "bar"}',
)

# Ensure correct exception is raised
with pytest.raises(DagRunTriggerException) as exc_info:
task.execute(context={})

assert exc_info.value.trigger_dag_id == TRIGGERED_DAG_ID
assert exc_info.value.conf == {"foo": "bar"}

@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3")
def test_trigger_dagrun_with_str_conf_error(self):
"""
Test TriggerDagRunOperator conf is not proper json string formatted
"""
with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
conf="{'foo': 'bar', 'key': 123}",
)

with pytest.raises(ValueError, match="conf parameter should be JSON Serializable"):
task.execute(context={})


# TODO: To be removed once the provider drops support for Airflow 2
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
Expand Down
Loading