Skip to content

@task.kubernetes TaskFlow decorator fails with IndexError and is unable to receive input #28933

@vchiapaikeo

Description

@vchiapaikeo

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes 5.0.0

Apache Airflow version

2.5.0

Operating System

Debian 11

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

When passing arguments (either args or kwargs) to a @task.kubernetes decorated function, the following exception occurs:

Task Logs:

[2023-01-13, 22:05:40 UTC] {kubernetes_pod.py:621} INFO - Building pod k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55 with labels: {'dag_id': 'test_k8s_input_1673647477', 'task_id': 'k8s_with_input', 'run_id': 'backfill__2023-01-01T0000000000-c16e0472d', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2023-01-13, 22:05:40 UTC] {kubernetes_pod.py:404} INFO - Found matching pod k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55 with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.5.0', 'dag_id': 'test_k8s_input_1673647477', 'kubernetes_pod_operator': 'True', 'run_id': 'backfill__2023-01-01T0000000000-c16e0472d', 'task_id': 'k8s_with_input', 'try_number': '1'}
[2023-01-13, 22:05:40 UTC] {kubernetes_pod.py:405} INFO - `try_number` of task_instance: 1
[2023-01-13, 22:05:40 UTC] {kubernetes_pod.py:406} INFO - `try_number` of pod: 1
[2023-01-13, 22:05:40 UTC] {pod_manager.py:189} WARNING - Pod not yet started: k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:41 UTC] {pod_manager.py:189} WARNING - Pod not yet started: k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:42 UTC] {pod_manager.py:189} WARNING - Pod not yet started: k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:43 UTC] {pod_manager.py:189} WARNING - Pod not yet started: k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - + python -c 'import base64, os;x = os.environ["__PYTHON_SCRIPT"];f = open("/tmp/script.py", "w"); f.write(x); f.close()'
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - + python /tmp/script.py
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - Traceback (most recent call last):
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO -   File "/tmp/script.py", line 14, in <module>
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO -     with open(sys.argv[1], "rb") as file:
[2023-01-13, 22:05:44 UTC] {pod_manager.py:237} INFO - IndexError: list index out of range
[2023-01-13, 22:05:44 UTC] {kubernetes_pod.py:499} INFO - Deleting pod: k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55
[2023-01-13, 22:05:44 UTC] {taskinstance.py:1772} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py", line 104, in execute
    return super().execute(context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 217, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 465, in execute
    self.cleanup(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 489, in cleanup
    raise AirflowException(
airflow.exceptions.AirflowException: Pod k8s-airflow-pod-5c285c340fdf4e-81721f4662e247e793f497ada2f1ce55 returned a failure:

What you think should happen instead

K8's decorator should properly receive input. The python command invoked here does not pass input. Contrast this with the docker version of the decorator which does properly pass pickled input.

How to reproduce

Create a dag:

import os

from airflow import DAG
from airflow.decorators import task

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "start_date": "2022-12-16",
    "retries": 0,
}

@task.kubernetes(
    image="python:3.8-slim-buster",
    namespace=os.getenv("AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE"),
    in_cluster=False,
)
def k8s_with_input(val: str) -> str:
    import datetime

    print(f"Got val: {val}")
    return val


with DAG(
    schedule_interval="@daily",
    max_active_runs=1,
    max_active_tasks=5,
    catchup=False,
    dag_id="test_oom_dag",
    default_args=DEFAULT_TASK_ARGS,
) as dag:
    output = k8s_with_input.override(task_id="k8s_with_input")("a")

Run and observe failure:

image

Task logs above.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions