Skip to content

Schedulers race condition when using with kubernetes executor #57618

@yengkhoo

Description

@yengkhoo

Apache Airflow version

Other Airflow 2/3 version (please specify below)

If "Other Airflow 2/3 version" selected, which one?

3.0.4

What happened?

I'm upgrading Airflow 2.11.0 to Airflow 3.0.4.
Using helm chart v1.18.0 to deploy on EKS, and the backend DB is AWS RDS Postgres (db.t4g.large, engine 15.12). I am using a custom image but just installing extra libraries.

In Airflow 3, when Scheduler replica is set to 2, sometimes a task will get its TaskInstance created twice with different try_number (each scheduler created one) before the first one started running. Consequently, 2 worker pods are created and one of them will fail with invalid_state error below.

I also tested this with versions 3.0.6 and 3.1.1. I've checked that the resources for apiservers, schedulers and database usage is still far from limit.

Extracted logs from Schedulers for a task (newest log on top):
schedulers-log.csv

Example error log from failed pod:

{"timestamp":"2025-10-25T02:49:26.475102Z","level":"info","event":"Executing workload","workload":"ExecuteTask(token='aaabbbbcccc', ti=TaskInstance(id=UUID('01994b35-8445-7dd3-bcd8-ecc39e0e448b'), task_id='spam_7.task_2', dag_id='spam_tasks_multiple_groups_dag', run_id='scheduled__2025-10-25T02:30:00+00:00', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=6, executor_config=None, parent_context_carrier={}, context_carrier={}, queued_dttm=None), dag_rel_path=PurePosixPath(''yeng-test/spam-dags.py'), bundle_info=BundleInfo(name='airflow-pipes', version='xx'), log_path='dag_id=spam_tasks_multiple_groups_dag/run_id=manual__2025-10-31T12:08:41+00:00/task_id=spam_7.task_2/attempt=1.log', type='ExecuteTask')","logger":"main"}
{"timestamp":"2025-10-25T02:49:27.001212Z","level":"info","event":"Connecting to server:","server":"http://airflow-api-server:8080/execution/","logger":"__main__"}
{"timestamp":"2025-10-25T02:49:27.068125Z","level":"info","event":"Secrets backends loaded for worker","count":1,"backend_classes":["EnvironmentVariablesBackend"],"logger":"supervisor"}
{"timestamp":"2025-10-25T02:49:27.095790Z","level":"warning","event":"Server error","detail":{"detail":{"reason":"invalid_state","message":"TI was not in a state where it could be marked as running","previous_state":"success"}},"logger":"airflow.sdk.api.client"}
{"timestamp":"2025-10-25T02:49:27.140185Z","level":"info","event":"Process exited","pid":14,"exit_code":-9,"signal_sent":"SIGKILL","logger":"supervisor"}
Traceback (most recent call last):
File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py", line 125, in
main()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py", line 121, in main
execute_workload(workload)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py", line 66, in execute_workload
supervise(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 1829, in supervise
process = ActivitySubprocess.start(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 933, in start
proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, bundle_info=bundle_info)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 944, in _on_child_started
ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 152, in start
resp = self.client.patch(f"task-instances/{id}/run", content=body.model_dump_json())
File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 1218, in patch
return self.request(
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/init.py", line 338, in wrapped_f
return copy(f, *args, **kw)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/init.py", line 477, in call
do = self.iter(retry_state=retry_state)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/init.py", line 378, in iter
result = action(retry_state)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/init.py", line 400, in
self._add_action_func(lambda rs: rs.outcome.result())
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/init.py", line 480, in call
result = fn(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 735, in request
return super().request(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 825, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 914, in send
response = self._send_handling_auth(
File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 942, in _send_handling_auth
response = self._send_handling_redirects(
File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 999, in _send_handling_redirects
raise exc
File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 982, in _send_handling_redirects
hook(response)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 117, in raise_on_4xx_5xx
return get_json_error(response) or response.raise_for_status()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 113, in get_json_error
raise err
airflow.sdk.api.client.ServerResponseError: Server returned error

What you think should happen instead?

When Scheduler replica is more than 1, a task's retry TI should only be created after the previous TI failed.

How to reproduce

values:

revisionHistoryLimit: 1
airflowVersion: 3.0.4
images:
  airflow:
    repository: my-custom-image
    tag: dev
    pullPolicy: IfNotPresent
  useDefaultImageForMigration: false
  migrationsWaitTimeout: 60
labels: ~
networkPolicies:
  enabled: false
securityContexts:
  pod:
    runAsUser: 50000
    runAsGroup: 0
    fsGroup: 50000
    fsGroupChangePolicy: "OnRootMismatch"
    runAsNonRoot: true
  containers:
    allowPrivilegeEscalation: false
    capabilities:
      drop:
        - ALL
    readOnlyRootFilesystem: true
# empty directory required to set readOnlyRootFilesystem
volumes:
  - name: tmp
    emptyDir: {}
  - name: opt
    emptyDir: {}
volumeMounts:
  - name: tmp
    mountPath: /tmp
  - name: opt
    mountPath: /opt/airflow
fernetKeySecretName: airflow-fernet-secret
env:
  - name: OPENLINEAGE_DISABLED
    value: "true"
config:
  core:
    auth_manager: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
    dagbag_import_timeout: 200
    parallelism: 200
  dag_processor:
    dag_file_processor_timeout: 300
    dag_bundle_config_list: "[{\"name\":\"airflow-dags\",\"classpath\":\"airflow.providers.git.bundles.git.GitDagBundle\",\"kwargs\":{\"subdir\":\"dags\",\"tracking_ref\":\"yeng-test-v3\",\"git_conn_id\":\"airflow-dags-dagbundle\",\"refresh_interval\":900}}]"
  database:
    sql_alchemy_pool_enabled: 'False'
    sql_alchemy_max_overflow: 20
    sql_alchemy_pool_size: 10
  kubernetes_executor:
    delete_worker_pods: true
  scheduler:
    parsing_processes: 4
  logging:
    remote_logging: 'True'
    remote_base_log_folder: s3://airflow-logs-test/
    # Connection name created to connect to S3.
    remote_log_conn_id: aws_s3_logging
    encrypt_s3_logs: False
  api:
    expose_config: 'True'
    base_url: 'https://airflow-sbx.mydomain.net'
ingress:
  apiServer:
    enabled: true
    annotations: ...
    hosts:
      - name: airflow-sbx.mydomain.net
    ingressClassName: alb
    pathType: "Prefix"
apiSecretKeySecretName: airflow-apiserver-secret
jwtSecretName: airflow-api-jwt-secret
webserver:
  defaultUser:
    enabled: false
apiServer:
  replicas: 2
  resources:
    limits:
      cpu: 3
      memory: 5Gi
    requests:
      cpu: 3
      memory: 5Gi
  topologySpreadConstraints:
    - maxSkew: 1
      topologyKey: "topology.kubernetes.io/zone"
      whenUnsatisfiable: ScheduleAnyway
      labelSelector:
        matchLabels:
          component: apiServer
    - maxSkew: 1
      topologyKey: "kubernetes.io/hostname"
      whenUnsatisfiable: ScheduleAnyway
      labelSelector:
        matchLabels:
          component: apiServer
  apiServerConfig: ~ # i am using okta
  serviceAccount:
    annotations:
      eks.amazonaws.com/role-arn: arn:aws:iam::xxx:role/kubernetes/AirflowRole
scheduler:
  replicas: 2
  resources:
    limits:
      cpu: 1.5
      memory: 2.5Gi
    requests:
      cpu: 1.5
      memory: 2.5Gi
  livenessProbe:
    initialDelaySeconds: 90
    periodSeconds: 50
    timeoutSeconds: 50
    failureThreshold: 20
  topologySpreadConstraints:
    - maxSkew: 1
      topologyKey: "topology.kubernetes.io/zone"
      whenUnsatisfiable: ScheduleAnyway
      labelSelector:
        matchLabels:
          component: scheduler
    - maxSkew: 1
      topologyKey: "kubernetes.io/hostname"
      whenUnsatisfiable: ScheduleAnyway
      labelSelector:
        matchLabels:
          component: scheduler
  serviceAccount:
    annotations:
      eks.amazonaws.com/role-arn: arn:aws:iam::xxx:role/kubernetes/AirflowRole
workers:
  resources:
    limits:
      cpu: 1000m
      memory: 1024Mi
    requests:
      cpu: 1000m
      memory: 1024Mi
  logGroomerSidecar:
    resources:
      limits:
        cpu: 1
        memory: 1Gi
      requests:
        cpu: 1
        memory: 1Gi
  securityContexts:
    pod:
      runAsUser: 50000
      runAsGroup: 0
      fsGroup: 50000
      fsGroupChangePolicy: "OnRootMismatch"
      runAsNonRoot: true
    container:
      allowPrivilegeEscalation: false
      capabilities:
        drop:
          - ALL
      readOnlyRootFilesystem: false
  serviceAccount:
    annotations:
      eks.amazonaws.com/role-arn: arn:aws:iam::xxx:role/kubernetes/AirflowRole
podTemplate: ~
dagProcessor:
  enabled: true
  replicas: 1
executor: "KubernetesExecutor"
allowPodLaunching: true
dags:
  persistence:
    enabled: false
logs:
  persistence:
    enabled: false
redis:
  enabled: false
postgresql:
  enabled: false
createUserJob:
  useHelmHooks: false
  applyCustomEnv: false
migrateDatabaseJob:
  enabled: true
  useHelmHooks: false
  applyCustomEnv: false
  jobAnnotations:
    "argocd.argoproj.io/hook": Sync
data:
  metadataSecretName: airflow-pgbouncer-secret
  enabled: true
  replicas: 1
  revisionHistoryLimit: 1
  sslmode: "disable"
  ciphers: "normal"
  auth_type: md5
  command: ["pgbouncer", "-u", "nobody", "/etc/pgbouncer-config/pgbouncer.ini"]
  configSecretName: airflow-pgbouncer-ini-custom-secret 
  extraVolumes: ...
  resources:
    limits:
      cpu: 1
      memory: 1Gi
    requests:
      cpu: 1
      memory: 1Gi

DAG that can reproduce the issue . After triggering, tasks will get scheduled and run as usual, but after a while failed worker pods will start showing.

from airflow.sdk import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
import time


# Task function
def sleep_task(task_id):
    print(f"Starting {task_id} ...")
    time.sleep(30)
    print(f"Finished {task_id}.")


# Default arguments
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "retries": 4,
}

with DAG(
    dag_id="spam_tasks_multiple_groups_dag",
    default_args=default_args,
    description="DAG with TaskGroups, each running multiple parallel tasks",
    schedule=None, 
    start_date=datetime(2025, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=["test", "parallel", "taskgroup", "load"],
) as dag:

    start = PythonOperator(
        task_id="start",
        python_callable=lambda: print("Starting DAG run"),
    )

    # Create task groups dynamically
    task_groups = []
    for g in range(1, 10):  
        with TaskGroup(group_id=f"spam_{g}") as tg:
            for i in range(1, 6): 
                PythonOperator(
                    task_id=f"task_{i}",
                    python_callable=sleep_task,
                    op_args=[f"grp{g}_task{i}"],
                )
        task_groups.append(tg)

    end = PythonOperator(
        task_id="end",
        python_callable=lambda: print("All task groups completed."),
    )

    # Define DAG structure: start → all groups (in parallel) → end
    start >> task_groups >> end

pip freeze | grep apache-airflow-providers

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==9.15.0
apache-airflow-providers-celery==3.12.4
apache-airflow-providers-cncf-kubernetes==10.8.2
apache-airflow-providers-fab==3.0.0
apache-airflow-providers-postgres==6.3.0
apache-airflow-providers-standard==1.9.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions