Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e604714
remove init container from k8s and change to json string
davidsharp7 May 10, 2025
81eec42
Merge branch 'apache:main' into remove_init_container_from_k8
davidsharp7 May 10, 2025
ea12705
Merge branch 'apache:main' into remove_init_container_from_k8
davidsharp7 May 11, 2025
c7527af
update based on feedback
davidsharp7 May 13, 2025
361811d
Merge branch 'remove_init_container_from_k8' of https://github.com/da…
davidsharp7 May 13, 2025
35568c6
Merge branch 'apache:main' into remove_init_container_from_k8
davidsharp7 May 13, 2025
cdd3108
Merge branch 'apache:main' into remove_init_container_from_k8
davidsharp7 May 13, 2025
f7c9939
Merge branch 'apache:main' into remove_init_container_from_k8
davidsharp7 May 13, 2025
af85477
Merge branch 'apache:main' into remove_init_container_from_k8
davidsharp7 May 13, 2025
9e4c148
update var to list based on feedback
davidsharp7 May 14, 2025
9395483
Merge branch 'apache:main' into remove_init_container_from_k8
davidsharp7 May 14, 2025
170d083
Merge branch 'remove_init_container_from_k8' of https://github.com/da…
davidsharp7 May 14, 2025
1ac2b91
Merge branch 'apache:main' into remove_init_container_from_k8
davidsharp7 May 15, 2025
3d8552c
remove content_json
davidsharp7 May 15, 2025
1d14811
remove content_json executor
davidsharp7 May 15, 2025
b59eac0
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 15, 2025
f5365ca
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 15, 2025
64c3f3e
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 16, 2025
f282ff0
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 16, 2025
86687bb
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 16, 2025
dc1c494
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 17, 2025
a0a0091
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 17, 2025
51f55e9
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 17, 2025
862beb2
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 18, 2025
d1c1bbe
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 19, 2025
3c870ef
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 19, 2025
3cb350d
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 19, 2025
305c421
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 19, 2025
1a094e6
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 20, 2025
f83a318
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 20, 2025
5e82ea7
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 20, 2025
8239c6c
Merge branch 'main' into remove_init_container_from_k8
davidsharp7 May 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,8 @@ def run_next(self, next_job: KubernetesJobType) -> None:
"python",
"-m",
"airflow.sdk.execution_time.execute_workload",
"--json-path",
"/tmp/execute/input.json",
"--json-string",
ser_input,
]
else:
raise ValueError(
Expand Down Expand Up @@ -427,7 +427,6 @@ def run_next(self, next_job: KubernetesJobType) -> None:
date=None,
run_id=run_id,
args=list(command),
content_json_for_volume=ser_input,
pod_override_object=kube_executor_config,
base_worker_pod=base_worker_pod,
with_mutation_hook=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from typing import TYPE_CHECKING

from dateutil import parser
from kubernetes.client import V1EmptyDirVolumeSource, V1Volume, V1VolumeMount, models as k8s
from kubernetes.client import models as k8s
from kubernetes.client.api_client import ApiClient

from airflow.exceptions import (
Expand Down Expand Up @@ -287,7 +287,6 @@ def construct_pod(
scheduler_job_id: str,
run_id: str | None = None,
map_index: int = -1,
content_json_for_volume: str = "",
*,
with_mutation_hook: bool = False,
) -> k8s.V1Pod:
Expand Down Expand Up @@ -355,39 +354,6 @@ def construct_pod(
containers=[main_container],
)

if content_json_for_volume:
import shlex

input_file_path = "/tmp/execute/input.json"
execute_volume = V1Volume(
name="execute-volume",
empty_dir=V1EmptyDirVolumeSource(),
)

execute_volume_mount = V1VolumeMount(
name="execute-volume",
mount_path="/tmp/execute",
read_only=False,
)

escaped_json = shlex.quote(content_json_for_volume)
init_container = k8s.V1Container(
name="init-container",
image="busybox",
command=["/bin/sh", "-c", f"echo {escaped_json} > {input_file_path}"],
volume_mounts=[execute_volume_mount],
)

main_container.volume_mounts = [execute_volume_mount]
main_container.command = args[:-1]
main_container.args = args[-1:]

podspec = k8s.V1PodSpec(
containers=[main_container],
volumes=[execute_volume],
init_containers=[init_container],
)

dynamic_pod.spec = podspec

# Reconcile the pods starting with the first chronologically,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,7 @@ def setup_method(self):
'{"token":"mock","ti":{"id":"4d828a62-a417-4936-a7a6-2b3fabacecab","task_id":"mock","dag_id":"mock","run_id":"mock","try_number":1,"map_index":-1,"pool_slots":1,"queue":"default","priority_weight":1},"dag_rel_path":"mock.py","bundle_info":{"name":"n/a","version":"no matter"},"log_path":"mock.log","kind":"ExecuteTask"}',
'{"token":"mock","ti":{"id":"4d828a62-a417-4936-a7a6-2b3fabacecab","task_id":"mock","dag_id":"mock","run_id":"mock","try_number":1,"map_index":-1,"pool_slots":1,"queue":"default","priority_weight":1},"dag_rel_path":"mock.py","bundle_info":{"name":"n/a","version":"no matter"},"log_path":"mock.log","kind":"ExecuteTask"}',
id="regular-input",
),
pytest.param(
'{"token":"mock","ti":{"id":"4d828a62-a417-4936-a7a6-2b3fabacecab","task_id":"moc\'k","dag_id":"mock","run_id":"mock","try_number":1,"map_index":-1,"pool_slots":1,"queue":"default","priority_weight":1},"dag_rel_path":"mock.py","bundle_info":{"name":"n/a","version":"no matter"},"log_path":"mock.log","kind":"ExecuteTask"}',
'{"token":"mock","ti":{"id":"4d828a62-a417-4936-a7a6-2b3fabacecab","task_id":"moc\'"\'"\'k","dag_id":"mock","run_id":"mock","try_number":1,"map_index":-1,"pool_slots":1,"queue":"default","priority_weight":1},"dag_rel_path":"mock.py","bundle_info":{"name":"n/a","version":"no matter"},"log_path":"mock.log","kind":"ExecuteTask"}',
id="input-with-single-quote-in-task-id",
),
)
],
)
def test_pod_spec_for_task_sdk_runs(self, content_json, expected, data_file):
Expand All @@ -196,44 +191,25 @@ def test_pod_spec_for_task_sdk_runs(self, content_json, expected, data_file):
"python",
"-m",
"airflow.sdk.execution_time.execute_workload",
"--json-path",
"/tmp/execute/input.json",
"--json-string",
content_json,
],
pod_override_object=None,
base_worker_pod=worker_config,
namespace="namespace",
scheduler_job_id="uuid",
content_json_for_volume=content_json,
)
sanitized_result = self.k8s_client.sanitize_for_serialization(result)

init_containers = sanitized_result["spec"]["initContainers"]
assert len(init_containers) == 1
init_container = init_containers[0]
assert init_container == {
"command": [
"/bin/sh",
"-c",
f"echo '{expected}' > /tmp/execute/input.json",
],
"image": "busybox",
"name": "init-container",
"volumeMounts": [{"mountPath": "/tmp/execute", "name": "execute-volume", "readOnly": False}],
}

volumes = sanitized_result["spec"]["volumes"]
assert len(volumes) == 1
volume = volumes[0]
assert volume == {"emptyDir": {}, "name": "execute-volume"}

main_container = sanitized_result["spec"]["containers"][0]
assert main_container["command"] == [

assert main_container["args"] == [
"python",
"-m",
"airflow.sdk.execution_time.execute_workload",
"--json-path",
"--json-string",
expected,
]
assert main_container["args"] == ["/tmp/execute/input.json"]

def test_from_obj_pod_override_object(self):
obj = {
Expand Down