Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make current working directory as templated field in BashOperator #37928

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow/operators/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ class BashOperator(BaseOperator):
:param skip_on_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: 99). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param cwd: Working directory to execute the command in.
:param cwd: Working directory to execute the command in (templated).
If None (default), the command is run in a temporary directory.
To use current DAG folder as the working directory,
you might set template ``{{ dag_run.dag.folder }}``.

Airflow will evaluate the exit code of the Bash command. In general, a non-zero exit code will result in
task failure and zero will result in task success.
Expand Down Expand Up @@ -130,7 +132,7 @@ class BashOperator(BaseOperator):

"""

template_fields: Sequence[str] = ("bash_command", "env")
template_fields: Sequence[str] = ("bash_command", "env", "cwd")
template_fields_renderers = {"bash_command": "bash", "env": "json"}
template_ext: Sequence[str] = (".sh", ".bash")
ui_color = "#f0ede4"
Expand Down
20 changes: 20 additions & 0 deletions tests/operators/test_bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import signal
from datetime import datetime, timedelta
from pathlib import Path
from time import sleep
from unittest import mock

Expand Down Expand Up @@ -244,3 +245,22 @@ def test_bash_operator_kill(self, dag_maker):
os.kill(proc.pid, signal.SIGTERM)
assert False, "BashOperator's subprocess still running after stopping on timeout!"
break

@pytest.mark.db_test
def test_templated_fields(self, create_task_instance_of_operator):
ti = create_task_instance_of_operator(
BashOperator,
# Templated fields
bash_command='echo "{{ dag_run.dag_id }}"',
env={"FOO": "{{ ds }}"},
cwd="{{ dag_run.dag.folder }}",
# Other parameters
dag_id="test_templated_fields_dag",
task_id="test_templated_fields_task",
execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc),
)
ti.render_templates()
task: BashOperator = ti.task
assert task.bash_command == 'echo "test_templated_fields_dag"'
assert task.env == {"FOO": "2024-02-01"}
assert task.cwd == Path(__file__).absolute().parent.as_posix()