diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py index dbc2d0cf64cfd..8c6f8e3d23d71 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py @@ -263,7 +263,7 @@ def send_task_to_executor( if AIRFLOW_V_3_0_PLUS: if TYPE_CHECKING: assert isinstance(args, workloads.BaseWorkload) - args = (args.model_dump_json(),) + args = (args.model_dump_json(exclude={"ti": {"executor_config"}}),) try: with timeout(seconds=OPERATION_TIMEOUT): result = task_to_run.apply_async(args=args, queue=queue) diff --git a/tests/integration/executors/test_celery_executor.py b/tests/integration/executors/test_celery_executor.py index e1ece9b387bba..bdb0758080d75 100644 --- a/tests/integration/executors/test_celery_executor.py +++ b/tests/integration/executors/test_celery_executor.py @@ -36,6 +36,7 @@ from celery.backends.database import DatabaseBackend from celery.contrib.testing.worker import start_worker from kombu.asynchronous import set_event_loop +from kubernetes.client import models as k8s from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowTaskTimeout @@ -137,7 +138,37 @@ def _change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=Non @pytest.mark.flaky(reruns=3) @pytest.mark.parametrize("broker_url", _prepare_test_bodies()) - def test_celery_integration(self, broker_url): + @pytest.mark.parametrize( + "executor_config", + [ + pytest.param({}, id="no_executor_config"), + pytest.param( + { + "pod_override": k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[ + k8s.V1Container( + name="base", + resources=k8s.V1ResourceRequirements( + requests={ + "cpu": "100m", + "memory": "384Mi", + }, + limits={ + "cpu": 1, + "memory": "500Mi", + }, + ), + ) + ] + ) + ) + }, + id="pod_override_executor_config", + ), + ], + ) + def test_celery_integration(self, broker_url, executor_config): from airflow.providers.celery.executors import celery_executor, celery_executor_utils def fake_execute_workload(command): @@ -157,6 +188,7 @@ def fake_execute_workload(command): try_number=0, priority_weight=1, queue=celery_executor_utils.celery_configuration["task_default_queue"], + executor_config=executor_config, ) keys = [ TaskInstanceKey("id", "success", "abc", 0, -1),