-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsuggester.py
78 lines (63 loc) · 2.68 KB
/
suggester.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import time
from urllib3.exceptions import InsecureRequestWarning
import querier
class Suggester:
def __init__(self, prometheus_host, token, k8s_flavour, time_range='7d', step='3h'):
self.prometheus_host = prometheus_host
self.token = token
self.time_range = time_range
self.queries = querier.queries[k8s_flavour]
self.end = int(time.time())
self.start = self.end - 24 * 60 * 60 * int(time_range[:-1])
self.step = step
self.aggregator = {}
for metric, query_struct in self.queries.items():
if query_struct["query_type"] == 'query_range':
self.aggregator[metric] = querier.aggregators[metric]
def build_query(self, k8s_object, metric):
host = f"{self.prometheus_host}/api/v1/{self.queries[metric]['query_type']}"
match self.queries[metric]["query_type"]:
case "query":
params = {
"query": self.queries[metric]["query"].format(
workload=k8s_object[9],
namespace=k8s_object[0],
time_range=self.time_range
)
}
case "query_range":
params = {
"query": self.queries[metric]["query"].format(
workload=k8s_object[9],
namespace=k8s_object[0],
time_range=self.time_range
),
"start": self.start,
"end": self.end,
"step": self.step
}
return host, params
def suggest_values(self, k8s_object):
headers = {
'Authorization': f'Bearer {self.token}'
}
results = {}
for metric, query_struct in self.queries.items():
host, params = self.build_query(k8s_object, metric)
result = querier.run_query(host, headers, params)
results[metric] = querier.handle_response(metric, result, self.aggregator.get(metric, None))
return [
k8s_object[0], # Namespace
k8s_object[1], # type
k8s_object[2], # Name
k8s_object[3], # Container
k8s_object[4], # Replicas
k8s_object[5], # Request CPU
results["cpu_request"], # suggested
k8s_object[6], # CPU LIMIT,
results["cpu_limit"],
k8s_object[7], # Memory request
results["memory_request"], # suggest
k8s_object[8], # memory limit
results["memory_limit"] # suggest
]