From 9264e31ab080d59c49491324566d10f20be372ee Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Thu, 7 Mar 2024 11:53:54 +0400 Subject: [PATCH 1/4] Make current working directory as templated field in BashOperator This reverts commit 35fef2befb1e95ffecd0d1c254544e69268d91b7. --- airflow/operators/bash.py | 6 ++++-- tests/operators/test_bash.py | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py index ff8edcba51aa7..f1edb0d46d91c 100644 --- a/airflow/operators/bash.py +++ b/airflow/operators/bash.py @@ -59,8 +59,10 @@ class BashOperator(BaseOperator): :param skip_on_exit_code: If task exits with this exit code, leave the task in ``skipped`` state (default: 99). If set to ``None``, any non-zero exit code will be treated as a failure. - :param cwd: Working directory to execute the command in. + :param cwd: Working directory to execute the command in (templated). If None (default), the command is run in a temporary directory. + To use current DAG folder as the working directory, + you might set template ``{{ dag_run.dag.folder }}``. Airflow will evaluate the exit code of the Bash command. In general, a non-zero exit code will result in task failure and zero will result in task success. @@ -130,7 +132,7 @@ class BashOperator(BaseOperator): """ - template_fields: Sequence[str] = ("bash_command", "env") + template_fields: Sequence[str] = ("bash_command", "env", "cwd") template_fields_renderers = {"bash_command": "bash", "env": "json"} template_ext: Sequence[str] = (".sh", ".bash") ui_color = "#f0ede4" diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py index d00477b11837a..7a52790bcb0b7 100644 --- a/tests/operators/test_bash.py +++ b/tests/operators/test_bash.py @@ -20,6 +20,7 @@ import os import signal from datetime import datetime, timedelta +from pathlib import Path from time import sleep from unittest import mock @@ -244,3 +245,22 @@ def test_bash_operator_kill(self, dag_maker): os.kill(proc.pid, signal.SIGTERM) assert False, "BashOperator's subprocess still running after stopping on timeout!" break + + @pytest.mark.db_test + def test_templated_fields(self, create_task_instance_of_operator): + ti = create_task_instance_of_operator( + BashOperator, + # Templated fields + bash_command='echo "{{ dag_run.dag_id }}"', + env={"FOO": "{{ ds }}"}, + cwd="{{ dag_run.dag.folder }}", + # Other parameters + dag_id="test_templated_fields_dag", + task_id="test_templated_fields_task", + execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc), + ) + ti.render_templates() + task: BashOperator = ti.task + assert task.bash_command == 'echo "test_templated_fields_dag"' + assert task.env == {"FOO": "2024-02-01"} + assert task.cwd == Path(__file__).absolute().parent.as_posix() From 66dfe274073dc31f17d25b90c1fe0bf16de68b48 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Thu, 7 Mar 2024 12:13:02 +0400 Subject: [PATCH 2/4] Fix Core and Serialization tests --- tests/models/test_renderedtifields.py | 8 +++++--- tests/serialization/test_dag_serialization.py | 6 +++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index 0f733bc8c1544..60a156d1908c7 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -137,9 +137,11 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da session.add(rtif) session.flush() - assert {"bash_command": expected_rendered_field, "env": None} == RTIF.get_templated_fields( - ti=ti, session=session - ) + assert { + "bash_command": expected_rendered_field, + "env": None, + "cwd": None, + } == RTIF.get_templated_fields(ti=ti, session=session) # Test the else part of get_templated_fields # i.e. for the TIs that are not stored in RTIF table # Fetching them will return None diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 3b8fe1e7b0500..47a284e93f738 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -169,7 +169,7 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i "ui_color": "#f0ede4", "ui_fgcolor": "#000", "template_ext": [".sh", ".bash"], - "template_fields": ["bash_command", "env"], + "template_fields": ["bash_command", "env", "cwd"], "template_fields_renderers": {"bash_command": "bash", "env": "json"}, "bash_command": "echo {{ task.task_id }}", "_task_type": "BashOperator", @@ -2150,7 +2150,7 @@ def test_operator_expand_serde(): }, "task_id": "a", "operator_extra_links": [], - "template_fields": ["bash_command", "env"], + "template_fields": ["bash_command", "env", "cwd"], "template_ext": [".sh", ".bash"], "template_fields_renderers": {"bash_command": "bash", "env": "json"}, "ui_color": "#f0ede4", @@ -2168,7 +2168,7 @@ def test_operator_expand_serde(): "downstream_task_ids": [], "task_id": "a", "template_ext": [".sh", ".bash"], - "template_fields": ["bash_command", "env"], + "template_fields": ["bash_command", "env", "cwd"], "template_fields_renderers": {"bash_command": "bash", "env": "json"}, "ui_color": "#f0ede4", "ui_fgcolor": "#000", From 50588e92334041d7124f57dfe173b323b74a74b6 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Thu, 7 Mar 2024 12:56:28 +0400 Subject: [PATCH 3/4] Fix TestRenderedTaskInstanceFields.test_write --- tests/models/test_renderedtifields.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index 60a156d1908c7..623a73054ddf7 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -263,7 +263,7 @@ def test_write(self, dag_maker): ) .first() ) - assert ("test_write", "test", {"bash_command": "echo test_val", "env": None}) == result + assert ("test_write", "test", {"bash_command": "echo test_val", "env": None, "cwd": None}) == result # Test that overwrite saves new values to the DB Variable.delete("test_key") @@ -289,7 +289,7 @@ def test_write(self, dag_maker): assert ( "test_write", "test", - {"bash_command": "echo test_val_updated", "env": None}, + {"bash_command": "echo test_val_updated", "env": None, "cwd": None}, ) == result_updated @mock.patch.dict(os.environ, {"AIRFLOW_VAR_API_KEY": "secret"}) From 751f9092ee1d55fcd6f5d902ed3a3557263d16ca Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Thu, 7 Mar 2024 14:42:39 +0400 Subject: [PATCH 4/4] Fixup additional tests --- tests/models/test_renderedtifields.py | 7 +++++-- tests/models/test_taskinstance.py | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index 623a73054ddf7..2652218e1d42d 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -303,8 +303,10 @@ def test_redact(self, redact, dag_maker): ) dr = dag_maker.create_dagrun() redact.side_effect = [ - "val 1", - "val 2", + # Order depends on order in Operator template_fields + "val 1", # bash_command + "val 2", # env + "val 3", # cwd ] ti = dr.task_instances[0] @@ -313,4 +315,5 @@ def test_redact(self, redact, dag_maker): assert rtif.rendered_fields == { "bash_command": "val 1", "env": "val 2", + "cwd": "val 3", } diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 8431381a6ff92..0d9d4df0f4f7c 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -636,7 +636,11 @@ def test_retry_handling(self, dag_maker): """ Test that task retries are handled properly """ - expected_rendered_ti_fields = {"env": None, "bash_command": "echo test_retry_handling; exit 1"} + expected_rendered_ti_fields = { + "env": None, + "bash_command": "echo test_retry_handling; exit 1", + "cwd": None, + } with dag_maker(dag_id="test_retry_handling") as dag: task = BashOperator(