Skip to content

Commit

Permalink
Don't crash scheduler if exec config has old k8s objects (#24117)
Browse files Browse the repository at this point in the history
From time to time k8s library objects change their attrs.  If executor config is stored with old version, and unpickled with new version, we can get attribute errors that can crash the scheduler (see apache/airflow#23727).

Here we update handling so that we fail the task but don't crash the scheduler.

(cherry picked from commit 0c41f437674f135fe7232a368bf9c198b0ecd2f0)

GitOrigin-RevId: b1be02473b2ad04dde8d1268a47f18a22eb89faa
  • Loading branch information
dstandish authored and Cloud Composer Team committed Dec 7, 2022
1 parent 71c41fe commit 0a1a9f5
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 7 deletions.
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,7 @@ def __repr__(self) -> str:

class TaskDeferralError(AirflowException):
"""Raised when a task failed during deferral for some reason."""


class PodReconciliationError(AirflowException):
"""Raised when an error is encountered while trying to merge pod configs."""
12 changes: 10 additions & 2 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from kubernetes.client.rest import ApiException
from urllib3.exceptions import ReadTimeoutError

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, PodReconciliationError
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
Expand Down Expand Up @@ -300,8 +300,9 @@ def run_next(self, next_job: KubernetesJobType) -> None:
and store relevant info in the current_jobs map so we can track the job's
status
"""
self.log.info('Kubernetes job is %s', str(next_job).replace("\n", " "))
key, command, kube_executor_config, pod_template_file = next_job
self.log.info('Kubernetes job is %s', key)

dag_id, task_id, run_id, try_number, map_index = key

if command[0:3] != ["airflow", "tasks", "run"]:
Expand Down Expand Up @@ -617,6 +618,13 @@ def sync(self) -> None:
task = self.task_queue.get_nowait()
try:
self.kube_scheduler.run_next(task)
except PodReconciliationError as e:
self.log.error(
"Pod reconciliation failed, likely due to kubernetes library upgrade. "
"Try clearing the task to re-run.",
exc_info=True,
)
self.fail(task[0], e)
except ApiException as e:

# These codes indicate something is wrong with pod definition; otherwise we assume pod
Expand Down
7 changes: 5 additions & 2 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from kubernetes.client import models as k8s
from kubernetes.client.api_client import ApiClient

from airflow.exceptions import AirflowConfigException
from airflow.exceptions import AirflowConfigException, PodReconciliationError
from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated
from airflow.utils import yaml
from airflow.version import version as airflow_version
Expand Down Expand Up @@ -389,7 +389,10 @@ def construct_pod(
# Pod from the pod_template_File -> Pod from executor_config arg -> Pod from the K8s executor
pod_list = [base_worker_pod, pod_override_object, dynamic_pod]

return reduce(PodGenerator.reconcile_pods, pod_list)
try:
return reduce(PodGenerator.reconcile_pods, pod_list)
except Exception as e:
raise PodReconciliationError from e

@staticmethod
def serialize_pod(pod: k8s.V1Pod) -> dict:
Expand Down
16 changes: 15 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,20 @@ def key(self) -> "TaskInstanceKey":
return self


def _executor_config_comparator(x, y):
"""
The TaskInstance.executor_config attribute is a pickled object that may contain
kubernetes objects. If the installed library version has changed since the
object was originally pickled, due to the underlying ``__eq__`` method on these
objects (which converts them to JSON), we may encounter attribute errors. In this
case we should replace the stored object.
"""
try:
return x == y
except AttributeError:
return False


class TaskInstance(Base, LoggingMixin):
"""
Task instances store the state of a task instance. This table is the
Expand Down Expand Up @@ -470,7 +484,7 @@ class TaskInstance(Base, LoggingMixin):
queued_dttm = Column(UtcDateTime)
queued_by_job_id = Column(Integer)
pid = Column(Integer)
executor_config = Column(PickleType(pickler=dill))
executor_config = Column(PickleType(pickler=dill, comparator=_executor_config_comparator))

external_executor_id = Column(String(ID_LEN, **COLLATION_ARGS))

Expand Down
39 changes: 39 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from urllib3 import HTTPResponse

from airflow import AirflowException
from airflow.exceptions import PodReconciliationError
from airflow.models.taskinstance import TaskInstanceKey
from airflow.operators.bash import BashOperator
from airflow.utils import timezone
Expand Down Expand Up @@ -272,6 +273,44 @@ def test_run_next_exception_requeue(
assert kubernetes_executor.task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED

@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
)
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_run_next_pod_reconciliation_error(self, mock_get_kube_client, mock_kubernetes_job_watcher):
"""
When construct_pod raises PodReconciliationError, we should fail the task.
"""
import sys

path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'

mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
fail_msg = 'test message'
mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=PodReconciliationError(fail_msg))
mock_get_kube_client.return_value = mock_kube_client
mock_api_client = mock.MagicMock()
mock_api_client.sanitize_for_serialization.return_value = {}
mock_kube_client.api_client = mock_api_client
config = {('kubernetes', 'pod_template_file'): path}
with conf_vars(config):
kubernetes_executor = self.kubernetes_executor
kubernetes_executor.start()
# Execute a task while the Api Throws errors
try_number = 1
task_instance_key = TaskInstanceKey('dag', 'task', 'run_id', try_number)
kubernetes_executor.execute_async(
key=task_instance_key,
queue=None,
command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
)
kubernetes_executor.sync()

assert kubernetes_executor.task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg

@mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
@mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')
Expand Down
30 changes: 29 additions & 1 deletion tests/kubernetes/test_pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import sys
import uuid
from unittest import mock
from unittest.mock import MagicMock

import pytest
from dateutil import parser
from kubernetes.client import ApiClient, models as k8s
from parameterized import parameterized

from airflow import __version__
from airflow.exceptions import AirflowConfigException
from airflow.exceptions import AirflowConfigException, PodReconciliationError
from airflow.kubernetes.pod_generator import (
PodDefaults,
PodGenerator,
Expand Down Expand Up @@ -520,6 +521,33 @@ def test_construct_pod_empty_executor_config(self, mock_uuid):
worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config)
assert worker_config_result == sanitized_result

@mock.patch('uuid.uuid4')
def test_construct_pod_attribute_error(self, mock_uuid):
"""
After upgrading k8s library we might get attribute error.
In this case it should raise PodReconciliationError
"""
path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
worker_config = PodGenerator.deserialize_model_file(path)
mock_uuid.return_value = self.static_uuid
executor_config = MagicMock()
executor_config.side_effect = AttributeError('error')

with pytest.raises(PodReconciliationError):
PodGenerator.construct_pod(
dag_id='dag_id',
task_id='task_id',
pod_id='pod_id',
kube_image='test-image',
try_number=3,
date=self.execution_date,
args=['command'],
pod_override_object=executor_config,
base_worker_pod=worker_config,
namespace='namespace',
scheduler_job_id='uuid',
)

@mock.patch('uuid.uuid4')
def test_ensure_max_label_length(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
Expand Down
26 changes: 25 additions & 1 deletion tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@
XCom,
)
from airflow.models.taskfail import TaskFail
from airflow.models.taskinstance import TaskInstance, load_error_file, set_error_file
from airflow.models.taskinstance import (
TaskInstance,
_executor_config_comparator,
load_error_file,
set_error_file,
)
from airflow.models.taskmap import TaskMap
from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.operators.bash import BashOperator
Expand Down Expand Up @@ -2868,3 +2873,22 @@ def get_extra_env():

echo_task = dag.get_task("echo")
assert "get_extra_env" in echo_task.upstream_task_ids


def test_executor_config_comparator():
"""
When comparison raises AttributeError, return False.
This can happen when executor config contains kubernetes objects pickled
under older kubernetes library version.
"""

class MockAttrError:
def __eq__(self, other):
raise AttributeError('hello')

a = MockAttrError()
with pytest.raises(AttributeError):
# just verify for ourselves that this throws
assert a == a
assert _executor_config_comparator(a, a) is False
assert _executor_config_comparator('a', 'a') is True

0 comments on commit 0a1a9f5

Please sign in to comment.