-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
executable file
·86 lines (62 loc) · 2.04 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python
from flask import Flask
from kubernetes import client, config, watch
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument("--calc", type=int)
args, _ = parser.parse_known_args()
namespace = "fat"
service_account = "fat"
name = "fat-calc"
image = "fat:foo"
def calc(n):
if n < 2:
return 1
else:
config.load_incluster_config()
core_api = client.CoreV1Api()
batch_api = client.BatchV1Api()
watch_api = watch.Watch()
container = client.V1Container(
name=name,
image=image,
args=["--calc", f"{n}"]
)
template = client.V1PodTemplateSpec(
spec=client.V1PodSpec(restart_policy="Never", service_account_name=service_account, containers=[container])
)
job = client.V1Job(
metadata=client.V1ObjectMeta(namespace=namespace, generate_name=f"{name}-"),
spec=client.V1JobSpec(template=template, ttl_seconds_after_finished=0)
)
res = batch_api.create_namespaced_job(
body=job,
namespace=namespace,
)
job_name = res.metadata.name
for event in watch_api.stream(
func=core_api.list_namespaced_pod,
namespace=namespace,
label_selector=f"job-name={job_name}",
timeout_seconds=1200
):
if event["object"].status.phase == "Succeeded":
pod_name = event["object"].metadata.name
watch_api.stop()
break
if pod_name:
log = core_api.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
)
val = int(log)
batch_api.delete_namespaced_job(job_name, namespace, propagation_policy="Background")
return val
if args.calc:
print(args.calc * calc(args.calc - 1))
else:
app = Flask(__name__)
@app.route("/<int:n>/")
def fat(n):
return f"{calc(n)}"
app.run(host="0.0.0.0")