-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
88 lines (75 loc) · 2.75 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from cheroot.server import HTTPServer
from cheroot.ssl.builtin import BuiltinSSLAdapter
import json
import web
import os
urls = (
'/validate', 'validate'
)
if 'DEBUG' in os.environ.keys():
web.config.debug = True
debug = True
else:
web.config.debug = False
debug = False
if 'SEMAPHORE_ANNOTATION' in os.environ:
semaphore_annotation = os.environ['SEMAPHORE_ANNOTATION']
else:
semaphore_annotation = 'bertera.it/k8s-semaphore'
if 'SEMAPHORE_RED' in os.environ:
semaphore_red = os.environ['SEMAPHORE_RED']
else:
semaphore_red = 'red'
class validate:
def POST(self):
request = json.loads(web.data())
if debug:
print("/validate REQ: %s" % request)
uid = request['request']['uid']
annotations = request['request']['oldObject']['metadata']['annotations']
resource_name = request['request']['name']
resource_kind = request['request']['requestKind']['kind']
resource_version = request['request']['requestKind']['version']
resource_group = request['request']['requestKind']['group']
if semaphore_annotation in annotations:
if annotations[semaphore_annotation] == semaphore_red:
response = {
"apiVersion": "admission.k8s.io/v1",
"kind": "AdmissionReview",
"response": {
"uid": uid,
"allowed": False,
"status": {
"code": 403,
"message": "Resource %s (kind: %s, version: %s, group: %s) is annotated with %s, cannot be removed" % (resource_name, resource_kind, resource_version, resource_group, semaphore_annotation)
}
}
}
if debug:
print("/validate RES: %s" % response)
return json.dumps(response)
response = {
"apiVersion": "admission.k8s.io/v1",
"kind": "AdmissionReview",
"response": {
"uid": uid,
"allowed": True
}
}
if debug:
print("/validate RES: %s" % response)
return json.dumps(response)
if __name__ == "__main__":
if 'TLS_CERT_PATH' in os.environ.keys():
certificate_path = os.environ['TLS_CERT_PATH']
else:
certificate_path = "/etc/certs/cert.pem"
if 'TLS_KEY_PATH' in os.environ:
key_path = os.environ['TLS_KEY_PATH']
else:
key_path = "/etc/certs/key.pem"
HTTPServer.ssl_adapter = BuiltinSSLAdapter(
certificate=certificate_path,
private_key=key_path)
app = web.application(urls, globals())
app.run()