diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index a8d580d3950ff..b6c2b097cdf58 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -398,8 +398,8 @@ def run_next(self, next_job: KubernetesJobType) -> None: "python", "-m", "airflow.sdk.execution_time.execute_workload", - "--json-path", - "/tmp/execute/input.json", + "--json-string", + ser_input, ] else: raise ValueError( @@ -427,7 +427,6 @@ def run_next(self, next_job: KubernetesJobType) -> None: date=None, run_id=run_id, args=list(command), - content_json_for_volume=ser_input, pod_override_object=kube_executor_config, base_worker_pod=base_worker_pod, with_mutation_hook=True, diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py index e91353631ae0b..ee0378e9ce1b0 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -34,7 +34,7 @@ from typing import TYPE_CHECKING from dateutil import parser -from kubernetes.client import V1EmptyDirVolumeSource, V1Volume, V1VolumeMount, models as k8s +from kubernetes.client import models as k8s from kubernetes.client.api_client import ApiClient from airflow.exceptions import ( @@ -287,7 +287,6 @@ def construct_pod( scheduler_job_id: str, run_id: str | None = None, map_index: int = -1, - content_json_for_volume: str = "", *, with_mutation_hook: bool = False, ) -> k8s.V1Pod: @@ -355,39 +354,6 @@ def construct_pod( containers=[main_container], ) - if content_json_for_volume: - import shlex - - input_file_path = "/tmp/execute/input.json" - execute_volume = V1Volume( - name="execute-volume", - empty_dir=V1EmptyDirVolumeSource(), - ) - - execute_volume_mount = V1VolumeMount( - name="execute-volume", - mount_path="/tmp/execute", - read_only=False, - ) - - escaped_json = shlex.quote(content_json_for_volume) - init_container = k8s.V1Container( - name="init-container", - image="busybox", - command=["/bin/sh", "-c", f"echo {escaped_json} > {input_file_path}"], - volume_mounts=[execute_volume_mount], - ) - - main_container.volume_mounts = [execute_volume_mount] - main_container.command = args[:-1] - main_container.args = args[-1:] - - podspec = k8s.V1PodSpec( - containers=[main_container], - volumes=[execute_volume], - init_containers=[init_container], - ) - dynamic_pod.spec = podspec # Reconcile the pods starting with the first chronologically, diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py index efb826b3ea6de..af55ae3f9395b 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py @@ -174,12 +174,7 @@ def setup_method(self): '{"token":"mock","ti":{"id":"4d828a62-a417-4936-a7a6-2b3fabacecab","task_id":"mock","dag_id":"mock","run_id":"mock","try_number":1,"map_index":-1,"pool_slots":1,"queue":"default","priority_weight":1},"dag_rel_path":"mock.py","bundle_info":{"name":"n/a","version":"no matter"},"log_path":"mock.log","kind":"ExecuteTask"}', '{"token":"mock","ti":{"id":"4d828a62-a417-4936-a7a6-2b3fabacecab","task_id":"mock","dag_id":"mock","run_id":"mock","try_number":1,"map_index":-1,"pool_slots":1,"queue":"default","priority_weight":1},"dag_rel_path":"mock.py","bundle_info":{"name":"n/a","version":"no matter"},"log_path":"mock.log","kind":"ExecuteTask"}', id="regular-input", - ), - pytest.param( - '{"token":"mock","ti":{"id":"4d828a62-a417-4936-a7a6-2b3fabacecab","task_id":"moc\'k","dag_id":"mock","run_id":"mock","try_number":1,"map_index":-1,"pool_slots":1,"queue":"default","priority_weight":1},"dag_rel_path":"mock.py","bundle_info":{"name":"n/a","version":"no matter"},"log_path":"mock.log","kind":"ExecuteTask"}', - '{"token":"mock","ti":{"id":"4d828a62-a417-4936-a7a6-2b3fabacecab","task_id":"moc\'"\'"\'k","dag_id":"mock","run_id":"mock","try_number":1,"map_index":-1,"pool_slots":1,"queue":"default","priority_weight":1},"dag_rel_path":"mock.py","bundle_info":{"name":"n/a","version":"no matter"},"log_path":"mock.log","kind":"ExecuteTask"}', - id="input-with-single-quote-in-task-id", - ), + ) ], ) def test_pod_spec_for_task_sdk_runs(self, content_json, expected, data_file): @@ -196,44 +191,25 @@ def test_pod_spec_for_task_sdk_runs(self, content_json, expected, data_file): "python", "-m", "airflow.sdk.execution_time.execute_workload", - "--json-path", - "/tmp/execute/input.json", + "--json-string", + content_json, ], pod_override_object=None, base_worker_pod=worker_config, namespace="namespace", scheduler_job_id="uuid", - content_json_for_volume=content_json, ) sanitized_result = self.k8s_client.sanitize_for_serialization(result) - init_containers = sanitized_result["spec"]["initContainers"] - assert len(init_containers) == 1 - init_container = init_containers[0] - assert init_container == { - "command": [ - "/bin/sh", - "-c", - f"echo '{expected}' > /tmp/execute/input.json", - ], - "image": "busybox", - "name": "init-container", - "volumeMounts": [{"mountPath": "/tmp/execute", "name": "execute-volume", "readOnly": False}], - } - - volumes = sanitized_result["spec"]["volumes"] - assert len(volumes) == 1 - volume = volumes[0] - assert volume == {"emptyDir": {}, "name": "execute-volume"} - main_container = sanitized_result["spec"]["containers"][0] - assert main_container["command"] == [ + + assert main_container["args"] == [ "python", "-m", "airflow.sdk.execution_time.execute_workload", - "--json-path", + "--json-string", + expected, ] - assert main_container["args"] == ["/tmp/execute/input.json"] def test_from_obj_pod_override_object(self): obj = {