diff --git a/Dockerfile.ci b/Dockerfile.ci index f7d4a240dadcf..0ccd108b4eb77 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -1099,7 +1099,9 @@ function check_airflow_python_client_installation() { } function start_api_server_with_examples(){ - if [[ ${START_API_SERVER_WITH_EXAMPLES=} != "true" ]]; then + # check if we should not start the api server with examples by checking if both + # START_API_SERVER_WITH_EXAMPLES is false AND the TEST_GROUP env var is not equal to "system" + if [[ ${START_API_SERVER_WITH_EXAMPLES=} != "true" && ${TEST_GROUP:=""} != "system" ]]; then return fi export AIRFLOW__CORE__LOAD_EXAMPLES=True diff --git a/airflow-core/src/airflow/executors/workloads.py b/airflow-core/src/airflow/executors/workloads.py index 1f4be22377e33..3da9fe3cede39 100644 --- a/airflow-core/src/airflow/executors/workloads.py +++ b/airflow-core/src/airflow/executors/workloads.py @@ -102,7 +102,11 @@ class ExecuteTask(BaseWorkload): @classmethod def make( - cls, ti: TIModel, dag_rel_path: Path | None = None, generator: JWTGenerator | None = None + cls, + ti: TIModel, + dag_rel_path: Path | None = None, + generator: JWTGenerator | None = None, + bundle_info: BundleInfo | None = None, ) -> ExecuteTask: from pathlib import Path @@ -110,10 +114,11 @@ def make( ser_ti = TaskInstance.model_validate(ti, from_attributes=True) ser_ti.parent_context_carrier = ti.dag_run.context_carrier - bundle_info = BundleInfo( - name=ti.dag_model.bundle_name, - version=ti.dag_run.bundle_version, - ) + if not bundle_info: + bundle_info = BundleInfo( + name=ti.dag_model.bundle_name, + version=ti.dag_run.bundle_version, + ) fname = log_filename_template_renderer()(ti=ti) token = "" diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 5e6a5013c5e40..39e3440299135 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -29,6 +29,7 @@ from contextlib import ExitStack from datetime import datetime, timedelta from functools import cache +from pathlib import Path from re import Pattern from typing import ( TYPE_CHECKING, @@ -76,6 +77,7 @@ UnknownExecutorException, ) from airflow.executors.executor_loader import ExecutorLoader +from airflow.executors.workloads import BundleInfo from airflow.models.asset import ( AssetDagRunQueue, AssetModel, @@ -235,10 +237,10 @@ def get_asset_triggered_next_run_info( } -def _triggerer_is_healthy(): +def _triggerer_is_healthy(session: Session): from airflow.jobs.triggerer_job_runner import TriggererJobRunner - job = TriggererJobRunner.most_recent_job() + job = TriggererJobRunner.most_recent_job(session=session) return job and job.is_alive() @@ -1715,7 +1717,7 @@ def add_logger_if_needed(ti: TaskInstance): self.log.warning("No tasks to run. unrunnable tasks: %s", ids_unrunnable) time.sleep(1) - triggerer_running = _triggerer_is_healthy() + triggerer_running = _triggerer_is_healthy(session) for ti in scheduled_tis: ti.task = tasks[ti.task_id] @@ -1728,8 +1730,26 @@ def add_logger_if_needed(ti: TaskInstance): if use_executor: if executor.has_task(ti): continue - # Send the task to the executor - executor.queue_task_instance(ti, ignore_ti_state=True) + # TODO: Task-SDK: This check is transitionary. Remove once all executors are ported over. + from airflow.executors import workloads + from airflow.executors.base_executor import BaseExecutor + + if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined] + workload = workloads.ExecuteTask.make( + ti, + dag_rel_path=Path(self.fileloc), + generator=executor.jwt_generator, + # For the system test/debug purpose, we use the default bundle which uses + # local file system. If it turns out to be a feature people want, we could + # plumb the Bundle to use as a parameter to dag.test + bundle_info=BundleInfo(name="dags-folder"), + ) + executor.queue_workload(workload, session=session) + ti.state = TaskInstanceState.QUEUED + session.commit() + else: + # Send the task to the executor + executor.queue_task_instance(ti, ignore_ti_state=True) else: # Run the task locally try: diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py b/dev/breeze/src/airflow_breeze/commands/testing_commands.py index 1dc8f9bf9f29e..2fe0b7de763c3 100644 --- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py @@ -1014,7 +1014,7 @@ def system_tests( collect_only=collect_only, enable_coverage=enable_coverage, forward_credentials=forward_credentials, - forward_ports=False, + forward_ports=True, github_repository=github_repository, integration=(), keep_env_variables=keep_env_variables, diff --git a/dev/breeze/src/airflow_breeze/utils/run_tests.py b/dev/breeze/src/airflow_breeze/utils/run_tests.py index 48110d030a00d..80d30c5f3654e 100644 --- a/dev/breeze/src/airflow_breeze/utils/run_tests.py +++ b/dev/breeze/src/airflow_breeze/utils/run_tests.py @@ -391,6 +391,9 @@ def generate_args_for_pytest( args.append(f"--ignore={group_folder}") if test_group not in IGNORE_DB_INIT_FOR_TEST_GROUPS: args.append("--with-db-init") + if test_group == GroupOfTests.SYSTEM: + # System tests will be inited when the api server is started + args.append("--without-db-init") if test_group == GroupOfTests.PYTHON_API_CLIENT: args.append("--ignore-glob=clients/python/tmp/*") args.extend(get_suspended_provider_args()) diff --git a/devel-common/src/tests_common/pytest_plugin.py b/devel-common/src/tests_common/pytest_plugin.py index f554b06de678f..2f0a416f47ff3 100644 --- a/devel-common/src/tests_common/pytest_plugin.py +++ b/devel-common/src/tests_common/pytest_plugin.py @@ -228,7 +228,13 @@ def pytest_addoption(parser: pytest.Parser): "--with-db-init", action="store_true", dest="db_init", - help="Forces database initialization before tests", + help="Forces database initialization before tests, if false it a DB reset still may occur.", + ) + group.addoption( + "--without-db-init", + action="store_true", + dest="no_db_init", + help="Forces NO database initialization before tests, takes precedent over --with-db-init.", ) group.addoption( "--integration", @@ -343,7 +349,7 @@ def initialize_airflow_tests(request): # Initialize Airflow db if required lock_file = os.path.join(airflow_home, ".airflow_db_initialised") - if not skip_db_tests: + if not skip_db_tests and not request.config.option.no_db_init: if request.config.option.db_init: from tests_common.test_utils.db import initial_db_init diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh index ce7a4d3e92959..fe4561bfaf445 100755 --- a/scripts/docker/entrypoint_ci.sh +++ b/scripts/docker/entrypoint_ci.sh @@ -350,7 +350,9 @@ function check_airflow_python_client_installation() { } function start_api_server_with_examples(){ - if [[ ${START_API_SERVER_WITH_EXAMPLES=} != "true" ]]; then + # check if we should not start the api server with examples by checking if both + # START_API_SERVER_WITH_EXAMPLES is false AND the TEST_GROUP env var is not equal to "system" + if [[ ${START_API_SERVER_WITH_EXAMPLES=} != "true" && ${TEST_GROUP:=""} != "system" ]]; then return fi export AIRFLOW__CORE__LOAD_EXAMPLES=True