Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix): HybridExecutor tasks of other executor rescheduled in kubernetes executor #43003

Merged
merged 3 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from typing import TYPE_CHECKING, Any, Sequence

from kubernetes.dynamic import DynamicClient
from sqlalchemy import select, update
from sqlalchemy import or_, select, update

from airflow.cli.cli_config import (
ARG_DAG_ID,
Expand All @@ -52,6 +52,7 @@
)
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_constants import KUBERNETES_EXECUTOR
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
Expand Down Expand Up @@ -229,13 +230,30 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non
assert self.kube_client
from airflow.models.taskinstance import TaskInstance

hybrid_executor_enabled = hasattr(TaskInstance, "executor")
default_executor = None
if hybrid_executor_enabled:
from airflow.executors.executor_loader import ExecutorLoader

default_executor = str(ExecutorLoader.get_default_executor_name())

with Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"):
self.log.debug("Clearing tasks that have not been launched")
query = select(TaskInstance).where(
TaskInstance.state == TaskInstanceState.QUEUED, TaskInstance.queued_by_job_id == self.job_id
TaskInstance.state == TaskInstanceState.QUEUED,
TaskInstance.queued_by_job_id == self.job_id,
)
if self.kubernetes_queue:
query = query.where(TaskInstance.queue == self.kubernetes_queue)
elif hybrid_executor_enabled and KUBERNETES_EXECUTOR == default_executor:
query = query.where(
or_(
TaskInstance.executor == KUBERNETES_EXECUTOR,
TaskInstance.executor.is_(None),
),
)
elif hybrid_executor_enabled:
query = query.where(TaskInstance.executor == KUBERNETES_EXECUTOR)
queued_tis: list[TaskInstance] = session.scalars(query).all()
self.log.info("Found %s queued task instances", len(queued_tis))

Expand Down
213 changes: 213 additions & 0 deletions providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
from urllib3 import HTTPResponse

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
KUBERNETES_EXECUTOR,
)
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.operators.empty import EmptyOperator
from airflow.providers.cncf.kubernetes import pod_generator
Expand Down Expand Up @@ -1277,6 +1283,7 @@ def test_kube_config_get_namespace_list(

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_not_launched(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
Expand All @@ -1287,6 +1294,13 @@ def test_clear_not_launched_queued_tasks_not_launched(
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

Expand Down Expand Up @@ -1320,6 +1334,7 @@ def test_clear_not_launched_queued_tasks_not_launched(
],
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_launched(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue
):
Expand Down Expand Up @@ -1350,6 +1365,13 @@ def test_clear_not_launched_queued_tasks_launched(
]
)

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

Expand All @@ -1376,6 +1398,7 @@ def test_clear_not_launched_queued_tasks_launched(

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_mapped_task(self, mock_kube_dynamic_client, dag_maker, session):
"""One mapped task has a launched pod - other does not."""

Expand Down Expand Up @@ -1410,6 +1433,13 @@ def get(*args, **kwargs):
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.side_effect = get

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
with dag_maker(dag_id="test_clear"):
op = BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"])

Expand Down Expand Up @@ -1443,13 +1473,21 @@ def get(*args, **kwargs):
)

@pytest.mark.db_test
@conf_vars({("core", "executor"): CELERY_KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_not_launched_other_queue(
self, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[])

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

Expand All @@ -1470,7 +1508,175 @@ def test_clear_not_launched_queued_tasks_not_launched_other_queue(
assert mock_kube_client.list_namespaced_pod.call_count == 0

@pytest.mark.db_test
@pytest.mark.skipif(
not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version"
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_not_launched_other_executor(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
ti.executor = "CeleryExecutor"
session.flush()

executor = self.kubernetes_executor
executor.job_id = 1

executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.executor == "CeleryExecutor"
assert ti.state == State.QUEUED
assert mock_kube_client.list_namespaced_pod.call_count == 0

@pytest.mark.db_test
@pytest.mark.skipif(
not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version"
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): CELERY_EXECUTOR})
def test_clear_not_launched_queued_tasks_not_launched_other_default_executor(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
session.flush()

executor = self.kubernetes_executor
executor.job_id = 1

executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.QUEUED
assert mock_kube_client.list_namespaced_pod.call_count == 0

@pytest.mark.db_test
@pytest.mark.skipif(
not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version"
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_launched_none_executor(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
session.flush()

executor = self.kubernetes_executor
executor.job_id = 1

executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.SCHEDULED
assert mock_kube_dynamic_client.return_value.get.call_count == 1

@pytest.mark.db_test
@pytest.mark.skipif(
not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version"
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_launched_kubernetes_executor(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
ti.executor = KUBERNETES_EXECUTOR
session.flush()

executor = self.kubernetes_executor
executor.job_id = 1

executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.SCHEDULED
assert mock_kube_dynamic_client.return_value.get.call_count == 1

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_clear_only_by_job_id(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
Expand All @@ -1479,6 +1685,13 @@ def test_clear_not_launched_queued_tasks_clear_only_by_job_id(
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(items=[])

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear_0", task_id="task0", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

Expand Down