Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,12 @@ function environment_initialization() {

cd "${AIRFLOW_SOURCES}"

# Temporarily add /opt/airflow/providers/standard/tests to PYTHONPATH in order to see example dags
# in the UI when testing in Breeze. This might be solved differently in the future
if [[ -d /opt/airflow/providers/standard/tests ]]; then
export PYTHONPATH=${PYTHONPATH=}:/opt/airflow/providers/standard/tests
fi

if [[ ${START_AIRFLOW:="false"} == "true" || ${START_AIRFLOW} == "True" ]]; then
export AIRFLOW__CORE__LOAD_EXAMPLES=${LOAD_EXAMPLES}
wait_for_asset_compilation
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/docs/tutorial/taskflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ system-level packages. TaskFlow supports multiple execution environments to isol
Creates a temporary virtualenv at task runtime. Great for experimental or dynamic tasks, but may have cold start
overhead.

.. exampleinclude:: /../src/airflow/example_dags/example_python_decorator.py
.. exampleinclude:: /../../providers/standard/tests/system/standard/example_python_decorator.py
:language: python
:dedent: 4
:start-after: [START howto_operator_python_venv]
Expand All @@ -283,7 +283,7 @@ overhead.

Executes the task using a pre-installed Python interpreter — ideal for consistent environments or shared virtualenvs.

.. exampleinclude:: /../src/airflow/example_dags/example_python_decorator.py
.. exampleinclude:: /../../providers/standard/tests/system/standard/example_python_decorator.py
:language: python
:dedent: 4
:start-after: [START howto_operator_external_python]
Expand Down Expand Up @@ -333,7 +333,7 @@ Using Sensors
Use ``@task.sensor`` to build lightweight, reusable sensors using Python functions. These support both poke and reschedule
modes.

.. exampleinclude:: /../src/airflow/example_dags/example_sensor_decorator.py
.. exampleinclude:: /../../providers/standard/tests/system/standard/example_sensor_decorator.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
Expand Down
26 changes: 26 additions & 0 deletions airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import os
from typing import TYPE_CHECKING

from airflow.configuration import conf
Expand All @@ -34,6 +35,7 @@
from airflow.dag_processing.bundles.base import BaseDagBundle

_example_dag_bundle_name = "example_dags"
_example_standard_dag_bundle_name = "example_standard_dags"


def _bundle_item_exc(msg):
Expand Down Expand Up @@ -80,6 +82,25 @@ def _add_example_dag_bundle(config_list):
)


def _add_example_standard_dag_bundle(config_list):
# TODO(potiuk): make it more generic - for now we only add standard example_dags if they are locally available
try:
from system import standard
except ImportError:
return

example_dag_folder = next(iter(standard.__path__))
config_list.append(
{
"name": _example_standard_dag_bundle_name,
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {
"path": example_dag_folder,
},
}
)


class DagBundlesManager(LoggingMixin):
"""Manager for DAG bundles."""

Expand Down Expand Up @@ -112,6 +133,11 @@ def parse_config(self) -> None:
_validate_bundle_config(config_list)
if conf.getboolean("core", "LOAD_EXAMPLES"):
_add_example_dag_bundle(config_list)
if (
os.environ.get("BREEZE", "").lower() == "true"
or os.environ.get("_IN_UNIT_TESTS", "").lower() == "true"
):
_add_example_standard_dag_bundle(config_list)

for cfg in config_list:
name = cfg["name"]
Expand Down
8 changes: 8 additions & 0 deletions airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,14 @@ def collect_dags(
example_dag_folder = next(iter(example_dags.__path__))

files_to_parse.extend(list_py_file_paths(example_dag_folder, safe_mode=safe_mode))
try:
from system import standard

example_dag_folder_standard = next(iter(standard.__path__))
files_to_parse.extend(list_py_file_paths(example_dag_folder_standard, safe_mode=safe_mode))
except ImportError:
# Nothing happens - this should only work during tests
pass

for filepath in files_to_parse:
try:
Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ def get_dag(bundle_names: list | None, dag_id: str, from_db: bool = False) -> DA

bundle_names = bundle_names or []
dag: DAG | None = None

if from_db:
dagbag = DagBag(read_dags_from_db=True)
dag = dagbag.get_dag(dag_id) # get_dag loads from the DB as requested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

pytestmark = pytest.mark.db_test

EXAMPLE_DAG_FILE = AIRFLOW_CORE_SOURCES_PATH / "airflow" / "example_dags" / "example_bash_operator.py"
TEST_DAG_ID = "example_bash_operator"
EXAMPLE_DAG_FILE = AIRFLOW_CORE_SOURCES_PATH / "airflow" / "example_dags" / "example_simplest_dag.py"
TEST_DAG_ID = "example_simplest_dag"
NOT_READABLE_DAG_ID = "latest_only_with_trigger"
TEST_MULTIPLE_DAGS_ID = "asset_produces_1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow.utils.file import list_py_file_paths

import system.standard
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk This looks incorrect. I don't really know what it's trying to do, but this is not meant to be a valid importable thing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not best idea it turned out. But it's fixed now. But in tests it's importable actually. That's the way how we import from other test modules in our tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More explanation. This is the way how we - currently - introduce "namespacing" in our tests - we had discussion about it before.

tests.
    unit.provider id
    system.provider id
    integration/ provider id

Example:

image

And the root PYTHONPATH is tests for each provider and airflow core.

One of the reasons for that was the problem with importing things like smtp or imap or a number of others - in many of those providers, the "library" used to import things start with the same smtp or imap. So adding tests.smtp like it was a problem.

We could have more sub-folders.

Maybe we should rethink it. I proposed iniitlllly to have provider_tests as top-level package - that would be more reasonable - but adds more nesting.

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_dags, parse_and_sync_to_db

Expand All @@ -37,8 +38,10 @@
def get_corresponding_dag_file_count(dir: str, include_examples: bool = True) -> int:
from airflow import example_dags

return len(list_py_file_paths(directory=dir)) + (
len(list_py_file_paths(next(iter(example_dags.__path__)))) if include_examples else 0
return (
len(list_py_file_paths(directory=dir))
+ (len(list_py_file_paths(next(iter(example_dags.__path__)))) if include_examples else 0)
+ (len(list_py_file_paths(next(iter(system.standard.__path__)))) if include_examples else 0)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@

# Example bash operator located here: airflow/example_dags/example_bash_operator.py
EXAMPLE_DAG_FILE = (
AIRFLOW_REPO_ROOT_PATH / "airflow-core" / "src" / "airflow" / "example_dags" / "example_bash_operator.py"
AIRFLOW_REPO_ROOT_PATH / "airflow-core" / "src" / "airflow" / "example_dags" / "example_simplest_dag.py"
)
TEST_DAG_ID = "example_bash_operator"
TEST_DAG_ID = "example_simplest_dag"


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ def test_should_response_200_single_dag(self, test_client):
assert response.json() == {
"active_dag_count": 1,
"failed_dag_count": 0,
"running_dag_count": 1,
"queued_dag_count": 0,
"running_dag_count": 0,
"queued_dag_count": 1,
}

@pytest.mark.usefixtures("freeze_time_for_dagruns", "make_failed_dag_runs")
Expand Down
5 changes: 3 additions & 2 deletions airflow-core/tests/unit/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_runs, parse_and_sync_to_db

pytestmark = pytest.mark.db_test
Expand Down Expand Up @@ -74,7 +75,6 @@ def move_back(old_path, new_path):
shutil.move(new_path, old_path)


# TODO: Check if tests needs side effects - locally there's missing DAG
class TestCliTasks:
run_id = "TEST_RUN_ID"
dag_id = "example_python_operator"
Expand All @@ -91,7 +91,7 @@ def setup_class(cls):
cls.parser = cli_parser.get_parser()
clear_db_runs()

cls.dagbag = DagBag(read_dags_from_db=True)
cls.dagbag = DagBag(read_dags_from_db=True, include_examples=True)
cls.dag = cls.dagbag.get_dag(cls.dag_id)
data_interval = cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE)
cls.dag_run = cls.dag.create_dagrun(
Expand All @@ -108,6 +108,7 @@ def setup_class(cls):
def teardown_class(cls) -> None:
clear_db_runs()

@conf_vars({("core", "load_examples"): "true"})
@pytest.mark.execution_timeout(120)
def test_cli_list_tasks(self):
for dag_id in self.dagbag.dags:
Expand Down
8 changes: 4 additions & 4 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from sqlalchemy import func, select, update
from sqlalchemy.orm import joinedload

import airflow.example_dags
from airflow import settings
from airflow.assets.manager import AssetManager
from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest
Expand Down Expand Up @@ -75,6 +74,7 @@
from airflow.utils.thread_safe_dict import ThreadSafeDict
from airflow.utils.types import DagRunTriggeredByType, DagRunType

from system import standard
from tests_common.test_utils.asserts import assert_queries_count
from tests_common.test_utils.config import conf_vars, env_vars
from tests_common.test_utils.db import (
Expand Down Expand Up @@ -104,7 +104,7 @@
ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, "elastic_dag.py")

TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
EXAMPLE_DAGS_FOLDER = airflow.example_dags.__path__[0]
EXAMPLE_STANDARD_DAGS_FOLDER = standard.__path__[0]
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
DEFAULT_LOGICAL_DATE = timezone.coerce_datetime(DEFAULT_DATE)
TRY_NUMBER = 1
Expand Down Expand Up @@ -5707,7 +5707,7 @@ def test_find_and_purge_task_instances_without_heartbeats_nothing(self):

@pytest.mark.usefixtures("testing_dag_bundle")
def test_find_and_purge_task_instances_without_heartbeats(self, session, create_dagrun):
dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, "example_branch_operator.py")
dagfile = os.path.join(EXAMPLE_STANDARD_DAGS_FOLDER, "example_branch_operator.py")
dagbag = DagBag(dagfile)
dag = dagbag.get_dag("example_branch_operator")
dm = LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
Expand Down Expand Up @@ -5773,7 +5773,7 @@ def test_task_instance_heartbeat_timeout_message(self, session, create_dagrun):
"""
Check that the task instance heartbeat timeout message comes out as expected
"""
dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, "example_branch_operator.py")
dagfile = os.path.join(EXAMPLE_STANDARD_DAGS_FOLDER, "example_branch_operator.py")
dagbag = DagBag(dagfile)
dag = dagbag.get_dag("example_branch_operator")
dm = LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
Expand Down
16 changes: 13 additions & 3 deletions airflow-core/tests/unit/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone as tz
from airflow.utils.session import create_session
from scripts.ci.pre_commit.common_precommit_utils import AIRFLOW_ROOT_PATH

from tests_common.test_utils import db
from tests_common.test_utils.asserts import assert_queries_count
Expand All @@ -52,6 +53,12 @@
pytestmark = pytest.mark.db_test

example_dags_folder = pathlib.Path(airflow.example_dags.__path__[0]) # type: ignore[attr-defined]
try:
import system.standard

example_standard_dags_folder = pathlib.Path(system.standard.__path__[0]) # type: ignore[attr-defined]
except ImportError:
example_standard_dags_folder = pathlib.Path(airflow.example_dags.__path__[0]) # type: ignore[attr-defined]

PY311 = sys.version_info >= (3, 11)

Expand Down Expand Up @@ -359,13 +366,16 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
("file_to_load", "expected"),
(
pytest.param(
pathlib.Path(example_dags_folder) / "example_bash_operator.py",
{"example_bash_operator": "airflow/example_dags/example_bash_operator.py"},
pathlib.Path(example_standard_dags_folder) / "example_bash_operator.py",
{
"example_bash_operator": f"{example_standard_dags_folder.relative_to(AIRFLOW_ROOT_PATH) / 'example_bash_operator.py'}"
},
id="example_bash_operator",
),
),
)
def test_get_dag_registration(self, file_to_load, expected):
pytest.importorskip("system.standard")
dagbag = DagBag(dag_folder=os.devnull, include_examples=False)
dagbag.process_file(os.fspath(file_to_load))
for dag_id, path in expected.items():
Expand Down Expand Up @@ -420,7 +430,7 @@ def test_refresh_py_dag(self, mock_dagmodel, tmp_path):
Test that we can refresh an ordinary .py DAG
"""
dag_id = "example_bash_operator"
fileloc = str(example_dags_folder / "example_bash_operator.py")
fileloc = str(example_standard_dags_folder / "example_bash_operator.py")

mock_dagmodel.return_value = DagModel()
mock_dagmodel.return_value.last_expired = datetime.max.replace(tzinfo=timezone.utc)
Expand Down
2 changes: 1 addition & 1 deletion clients/python/test_python_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@

# Make sure in the [core] section, the `load_examples` config is set to True in your airflow.cfg
# or AIRFLOW__CORE__LOAD_EXAMPLES environment variable set to True
DAG_ID = "example_bash_operator"
DAG_ID = "example_simplest_dag"


# Enter a context with an instance of the API client
Expand Down
9 changes: 1 addition & 8 deletions devel-common/src/docs/provider_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,8 @@
"operators/_partials",
"_api/airflow/index.rst",
"_api/airflow/providers/index.rst",
"_api/airflow/providers/apache/index.rst",
"_api/airflow/providers/atlassian/index.rst",
"_api/airflow/providers/cncf/index.rst",
"_api/airflow/providers/common/index.rst",
"_api/airflow/providers/common/messaging/providers/base_provider/index.rst",
"_api/airflow/providers/common/messaging/providers/sqs/index.rst",
"_api/airflow/providers/dbt/index.rst",
"_api/airflow/providers/microsoft/index.rst",
"_api/docs/conf",
*[f"_api/airflow/providers/{subpackage}/index.rst" for subpackage in empty_subpackages],
*[f"_api/system/{subpackage}/index.rst" for subpackage in empty_subpackages],
*[f"_api/tests/system/{subpackage}/index.rst" for subpackage in empty_subpackages],
]
Expand Down
11 changes: 11 additions & 0 deletions devel-common/src/sphinx_exts/docs_build/docs_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ def _src_dir(self) -> Path:
console.print(f"[red]Unknown package name: {self.package_name}")
sys.exit(1)

@property
def pythonpath(self) -> list[Path]:
path = []
if (self._src_dir.parent / "tests").exists():
path.append(self._src_dir.parent.joinpath("tests").resolve())
return path

@property
def _generated_api_dir(self) -> Path:
return self._build_dir.resolve() / "_api"
Expand Down Expand Up @@ -167,6 +174,8 @@ def check_spelling(self, verbose: bool) -> tuple[list[SpellingError], list[DocBu
console.print("[yellow]Command to run:[/] ", " ".join([shlex.quote(arg) for arg in build_cmd]))
env = os.environ.copy()
env["AIRFLOW_PACKAGE_NAME"] = self.package_name
if self.pythonpath:
env["PYTHONPATH"] = ":".join([path.as_posix() for path in self.pythonpath])
if verbose:
console.print(
f"[bright_blue]{self.package_name:60}:[/] The output is hidden until an error occurs."
Expand Down Expand Up @@ -246,6 +255,8 @@ def build_sphinx_docs(self, verbose: bool) -> list[DocBuildError]:
console.print("[yellow]Command to run:[/] ", " ".join([shlex.quote(arg) for arg in build_cmd]))
env = os.environ.copy()
env["AIRFLOW_PACKAGE_NAME"] = self.package_name
if self.pythonpath:
env["PYTHONPATH"] = ":".join([path.as_posix() for path in self.pythonpath])
if verbose:
console.print(
f"[bright_blue]{self.package_name:60}:[/] Running sphinx. "
Expand Down
Loading
Loading