Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't crash scheduler if exec config has old k8s objects #24117

Merged
merged 1 commit into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,7 @@ def __repr__(self) -> str:

class TaskDeferralError(AirflowException):
"""Raised when a task failed during deferral for some reason."""


class PodReconciliationError(AirflowException):
"""Raised when an error is encountered while trying to merge pod configs."""
12 changes: 10 additions & 2 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from kubernetes.client.rest import ApiException
from urllib3.exceptions import ReadTimeoutError

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, PodReconciliationError
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
Expand Down Expand Up @@ -300,8 +300,9 @@ def run_next(self, next_job: KubernetesJobType) -> None:
and store relevant info in the current_jobs map so we can track the job's
status
"""
self.log.info('Kubernetes job is %s', str(next_job).replace("\n", " "))
key, command, kube_executor_config, pod_template_file = next_job
self.log.info('Kubernetes job is %s', key)

dag_id, task_id, run_id, try_number, map_index = key

if command[0:3] != ["airflow", "tasks", "run"]:
Expand Down Expand Up @@ -617,6 +618,13 @@ def sync(self) -> None:
task = self.task_queue.get_nowait()
try:
self.kube_scheduler.run_next(task)
except PodReconciliationError as e:
self.log.error(
"Pod reconciliation failed, likely due to kubernetes library upgrade. "
"Try clearing the task to re-run.",
exc_info=True,
)
self.fail(task[0], e)
except ApiException as e:

# These codes indicate something is wrong with pod definition; otherwise we assume pod
Expand Down
7 changes: 5 additions & 2 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from kubernetes.client import models as k8s
from kubernetes.client.api_client import ApiClient

from airflow.exceptions import AirflowConfigException
from airflow.exceptions import AirflowConfigException, PodReconciliationError
from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated
from airflow.utils import yaml
from airflow.version import version as airflow_version
Expand Down Expand Up @@ -389,7 +389,10 @@ def construct_pod(
# Pod from the pod_template_File -> Pod from executor_config arg -> Pod from the K8s executor
pod_list = [base_worker_pod, pod_override_object, dynamic_pod]

return reduce(PodGenerator.reconcile_pods, pod_list)
try:
return reduce(PodGenerator.reconcile_pods, pod_list)
except Exception as e:
raise PodReconciliationError from e

@staticmethod
def serialize_pod(pod: k8s.V1Pod) -> dict:
Expand Down
16 changes: 15 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,20 @@ def key(self) -> "TaskInstanceKey":
return self


def _executor_config_comparator(x, y):
"""
The TaskInstance.executor_config attribute is a pickled object that may contain
kubernetes objects. If the installed library version has changed since the
object was originally pickled, due to the underlying ``__eq__`` method on these
objects (which converts them to JSON), we may encounter attribute errors. In this
case we should replace the stored object.
"""
try:
return x == y
except AttributeError:
dstandish marked this conversation as resolved.
Show resolved Hide resolved
return False


class TaskInstance(Base, LoggingMixin):
"""
Task instances store the state of a task instance. This table is the
Expand Down Expand Up @@ -442,7 +456,7 @@ class TaskInstance(Base, LoggingMixin):
queued_dttm = Column(UtcDateTime)
queued_by_job_id = Column(Integer)
pid = Column(Integer)
executor_config = Column(PickleType(pickler=dill))
executor_config = Column(PickleType(pickler=dill, comparator=_executor_config_comparator))

external_executor_id = Column(String(ID_LEN, **COLLATION_ARGS))

Expand Down
39 changes: 39 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from urllib3 import HTTPResponse

from airflow import AirflowException
from airflow.exceptions import PodReconciliationError
from airflow.models.taskinstance import TaskInstanceKey
from airflow.operators.bash import BashOperator
from airflow.utils import timezone
Expand Down Expand Up @@ -272,6 +273,44 @@ def test_run_next_exception_requeue(
assert kubernetes_executor.task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED

@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
)
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_run_next_pod_reconciliation_error(self, mock_get_kube_client, mock_kubernetes_job_watcher):
"""
When construct_pod raises PodReconciliationError, we should fail the task.
"""
import sys

path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'

mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
fail_msg = 'test message'
mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=PodReconciliationError(fail_msg))
mock_get_kube_client.return_value = mock_kube_client
mock_api_client = mock.MagicMock()
mock_api_client.sanitize_for_serialization.return_value = {}
mock_kube_client.api_client = mock_api_client
config = {('kubernetes', 'pod_template_file'): path}
with conf_vars(config):
kubernetes_executor = self.kubernetes_executor
kubernetes_executor.start()
# Execute a task while the Api Throws errors
try_number = 1
task_instance_key = TaskInstanceKey('dag', 'task', 'run_id', try_number)
kubernetes_executor.execute_async(
key=task_instance_key,
queue=None,
command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
)
kubernetes_executor.sync()

assert kubernetes_executor.task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg

@mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
@mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')
Expand Down
30 changes: 29 additions & 1 deletion tests/kubernetes/test_pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import sys
import uuid
from unittest import mock
from unittest.mock import MagicMock

import pytest
from dateutil import parser
from kubernetes.client import ApiClient, models as k8s
from parameterized import parameterized

from airflow import __version__
from airflow.exceptions import AirflowConfigException
from airflow.exceptions import AirflowConfigException, PodReconciliationError
from airflow.kubernetes.pod_generator import (
PodDefaults,
PodGenerator,
Expand Down Expand Up @@ -520,6 +521,33 @@ def test_construct_pod_empty_executor_config(self, mock_uuid):
worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config)
assert worker_config_result == sanitized_result

@mock.patch('uuid.uuid4')
def test_construct_pod_attribute_error(self, mock_uuid):
"""
After upgrading k8s library we might get attribute error.
In this case it should raise PodReconciliationError
"""
path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
worker_config = PodGenerator.deserialize_model_file(path)
mock_uuid.return_value = self.static_uuid
executor_config = MagicMock()
executor_config.side_effect = AttributeError('error')

with pytest.raises(PodReconciliationError):
PodGenerator.construct_pod(
dag_id='dag_id',
task_id='task_id',
pod_id='pod_id',
kube_image='test-image',
try_number=3,
date=self.execution_date,
args=['command'],
pod_override_object=executor_config,
base_worker_pod=worker_config,
namespace='namespace',
scheduler_job_id='uuid',
)

@mock.patch('uuid.uuid4')
def test_ensure_max_label_length(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
Expand Down
21 changes: 20 additions & 1 deletion tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
)
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskfail import TaskFail
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstance import TaskInstance, _executor_config_comparator
from airflow.models.taskmap import TaskMap
from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.operators.bash import BashOperator
Expand Down Expand Up @@ -2859,3 +2859,22 @@ def get_extra_env():

echo_task = dag.get_task("echo")
assert "get_extra_env" in echo_task.upstream_task_ids


def test_executor_config_comparator():
"""
When comparison raises AttributeError, return False.
This can happen when executor config contains kubernetes objects pickled
under older kubernetes library version.
"""

class MockAttrError:
def __eq__(self, other):
raise AttributeError('hello')

a = MockAttrError()
with pytest.raises(AttributeError):
# just verify for ourselves that this throws
assert a == a
assert _executor_config_comparator(a, a) is False
assert _executor_config_comparator('a', 'a') is True