Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@


def _parse_file_entrypoint():
import os

import structlog

from airflow.sdk.execution_time import task_runner
Expand All @@ -87,6 +85,11 @@ def _parse_file_entrypoint():
task_runner.SUPERVISOR_COMMS = comms_decoder
log = structlog.get_logger(logger_name="task")

# Put bundle root on sys.path if needed. This allows the dag bundle to add
# code in util modules to be shared between files within the same bundle.
if (bundle_root := os.fspath(msg.bundle_path)) not in sys.path:
sys.path.append(bundle_root)

result = _parse_file(msg, log)
if result is not None:
comms_decoder.send_request(log, result)
Expand Down
7 changes: 0 additions & 7 deletions airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,6 @@ def prepare_syspath_for_config_and_plugins():
sys.path.append(PLUGINS_FOLDER)


def prepare_syspath_for_dags_folder():
"""Update sys.path to include the DAGs folder."""
if DAGS_FOLDER not in sys.path:
sys.path.append(DAGS_FOLDER)


def import_local_settings():
"""Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
try:
Expand Down Expand Up @@ -612,7 +606,6 @@ def initialize():
# in airflow_local_settings to take precendec
load_policy_plugins(POLICY_PLUGIN_MANAGER)
import_local_settings()
prepare_syspath_for_dags_folder()
global LOGGING_CLASS_PATH
LOGGING_CLASS_PATH = configure_logging()

Expand Down
15 changes: 2 additions & 13 deletions airflow-core/tests/unit/core/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,16 @@ def teardown_method(self):

@mock.patch("airflow.settings.prepare_syspath_for_config_and_plugins")
@mock.patch("airflow.settings.import_local_settings")
@mock.patch("airflow.settings.prepare_syspath_for_dags_folder")
def test_initialize_order(
self,
mock_prepare_syspath_for_dags_folder,
mock_import_local_settings,
mock_prepare_syspath_for_config_and_plugins,
):
def test_initialize_order(self, mock_import_local_settings, mock_prepare_syspath_for_config_and_plugins):
"""
Tests that import_local_settings is called between prepare_syspath_for_config_and_plugins
and prepare_syspath_for_dags_folder
Tests that import_local_settings is called after prepare_syspath_for_config_and_plugins
"""
mock_local_settings = mock.Mock()

mock_local_settings.attach_mock(
mock_prepare_syspath_for_config_and_plugins, "prepare_syspath_for_config_and_plugins"
)
mock_local_settings.attach_mock(mock_import_local_settings, "import_local_settings")
mock_local_settings.attach_mock(
mock_prepare_syspath_for_dags_folder, "prepare_syspath_for_dags_folder"
)

import airflow.settings

Expand All @@ -119,7 +109,6 @@ def test_initialize_order(
expected_calls = [
call.prepare_syspath_for_config_and_plugins(),
call.import_local_settings(),
call.prepare_syspath_for_dags_folder(),
]

mock_local_settings.assert_has_calls(expected_calls)
Expand Down
34 changes: 30 additions & 4 deletions airflow-core/tests/unit/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def fake_collect_dags(dagbag: DagBag, *args, **kwargs):
assert resp.import_errors is not None
assert "a.py" in resp.import_errors

# @pytest.mark.execution_timeout(10)
def test_top_level_variable_access(
self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
):
Expand Down Expand Up @@ -271,9 +270,7 @@ def dag_in_a_fn():
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "test_my_conn"

def test_top_level_connection_access_not_found(
self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
):
def test_top_level_connection_access_not_found(self, tmp_path: pathlib.Path):
logger_filehandle = MagicMock()

def dag_in_a_fn():
Expand All @@ -297,6 +294,35 @@ def dag_in_a_fn():
if result.import_errors:
assert "CONNECTION_NOT_FOUND" in next(iter(result.import_errors.values()))

def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path):
tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")

dag1_path = tmp_path.joinpath("dag1.py")
dag1_code = """
from util import NAME
from airflow.sdk import DAG
with DAG(NAME):
pass
"""
dag1_path.write_text(textwrap.dedent(dag1_code))

proc = DagFileProcessorProcess.start(
id=1,
path=dag1_path,
bundle_path=tmp_path,
callbacks=[],
logger_filehandle=MagicMock(),
)
while not proc.is_ready:
proc._service_subprocess(0.1)

result = proc.parsing_result
assert result is not None
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "dag_name"


def write_dag_in_a_fn_to_file(fn: Callable[[], None], folder: pathlib.Path) -> pathlib.Path:
# Create the dag in a fn, and use inspect.getsource to write it to a file so that
Expand Down