-
Notifications
You must be signed in to change notification settings - Fork 0
/
metrics-eval.py
268 lines (196 loc) · 10.2 KB
/
metrics-eval.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# This Python script calculates additional metrics from the
# 'final_evaluation.json' file in the experiment output directory and saves the
# data to the 'metrics-final_evaluation.json' file in the same directory.
#
# Some scripts related to plotting require this script to be run before
# plotting.
import argparse
from pathlib import Path
import logging
import os
import sys
import numpy as np
# Add the current directory (where Python is called) to sys.path. This is
# required to load modules in the project root directory (like dfaas_utils.py).
sys.path.append(os.getcwd())
import dfaas_utils
# Initialize logger for this module.
logging.basicConfig(format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d -- %(message)s", level=logging.DEBUG)
logger = logging.getLogger(Path(__file__).name)
def get_agents():
"""Returns the agent IDs."""
# TODO: Make dynamic.
return ["node_0", "node_1"]
def num_episodes(eval):
"""Returns the number of episodes in the evaluation.
The value is extracted from the given evaluation data."""
return eval["episodes_this_iter"]
def average_reward_per_step(eval, metrics):
"""Calculates average reward per step per episode."""
agents = get_agents()
# Each item is an iteration.
metrics["reward_average_per_step"] = []
for episode in eval["hist_stats"]["reward"]:
reward_episode = {}
for agent in agents:
reward_episode[agent] = np.average(episode[agent])
metrics["reward_average_per_step"].append(reward_episode)
def _calc_excess_reject_step(action, excess, queue_capacity):
"""Returns the number of excess rejected requests for a single step for a
generic agent, using the given action, excess, and queue capacity. This is a
helper function that is used in calc_excess_reject().
The arguments are:
- action: a tuple of three sizes with the requests to locally process,
forward, and reject,
- excess: a tuple of length two with the excess number of locally
processed requests and forwarded rejected requests,
- queue_capacity: the queue capacity of the agent.
"""
local, forward, reject = action
excess_local, forward_reject = excess
free_slots = queue_capacity - local
if free_slots < 0:
# There is a over-local processing.
assert excess_local == -free_slots, f"Expected equal, found {excess_local} != {-free_slots}"
excess_reject = 0
elif free_slots >= reject:
# All rejected requests could have been processed locally.
excess_reject = reject
else:
# Some rejected requests could have been processed locally.
excess_reject = reject - (reject - free_slots)
reject -= excess_reject
if forward_reject == 0:
# All forwarded requests were not rejected. This means that the policy
# may have forwarded more requests instead of rejecting them
# immediately.
excess_reject += reject
return excess_reject
def calc_excess_reject(episode_data, agent, epi_idx):
"""Returns the number of excess rejected requests (requests that could have
been forwarded or processed locally) for each step.
The number is calculated for the given agent in each step.
TODO: This function should not exist, the environment should provide this
value."""
input_reqs = episode_data["observation_input_requests"][epi_idx][agent]
queue_capacity = episode_data["observation_queue_capacity"][epi_idx][agent]
local_reqs = episode_data["action_local"][epi_idx][agent]
reject_reqs = episode_data["action_reject"][epi_idx][agent]
excess_local = episode_data["excess_local"][epi_idx][agent]
steps = len(input_reqs)
forward_reqs = episode_data["action_forward"][epi_idx][agent]
excess_forward_reject = episode_data["excess_forward_reject"][epi_idx][agent]
if len(forward_reqs) == 0:
# If it is zero, we have an asymmetric environment: create zero arrays
# to be compatible with the metrics calculation.
forward_reqs = np.zeros(steps, dtype=np.int32)
excess_forward_reject = np.zeros(steps, dtype=np.int32)
excess_reject = np.zeros(len(input_reqs), dtype=np.int32)
for step in range(len(input_reqs)):
action = (local_reqs[step], forward_reqs[step], reject_reqs[step])
excess = (excess_local[step], excess_forward_reject[step])
excess_reject[step] = _calc_excess_reject_step(action,
excess,
queue_capacity[step])
return excess_reject
def reqs_exceed_per_step(eval, metrics):
"""Calculates the following metrics:
1. Percentage of local requests that exceed queue capacity out of all
local requests.
2. Percentage of forwarded requests that are rejected out of all
forwarded requests.
3. Percentage of excess rejected requests out of all rejected requests.
The metrics are calculated for each step."""
agents = get_agents()
# If an action is zero, we get a NaN or other invalid values, but can safely
# convert to zero percent because the excess reject is also zero.
old = np.seterr(invalid="ignore")
local_iter, reject_iter, forward_reject_iter = [], [], []
for epi_idx in range(num_episodes(eval)):
local_epi, reject_epi, forward_reject_epi = {}, {}, {}
for agent in agents:
# Calculation of local excess percent.
# Note these are NumPy arrays.
local_reqs = np.array(eval["hist_stats"]["action_local"][epi_idx][agent], dtype=np.int32)
excess_local = np.array(eval["hist_stats"]["excess_local"][epi_idx][agent], dtype=np.int32)
local_epi[agent] = excess_local * 100 / local_reqs
local_epi[agent] = np.nan_to_num(local_epi[agent], posinf=0.0, neginf=0.0)
# Calculation of reject excess percent.
excess_reject = calc_excess_reject(eval["hist_stats"], agent, epi_idx)
action_reject = np.array(eval["hist_stats"]["action_reject"][epi_idx][agent], dtype=np.int32)
reject_epi[agent] = excess_reject * 100 / action_reject
reject_epi[agent] = np.nan_to_num(reject_epi[agent], posinf=0.0, neginf=0.0)
# Forwarded requests.
forward_reject = np.array(eval["hist_stats"]["excess_forward_reject"][epi_idx][agent], dtype=np.int32)
action_forward = np.array(eval["hist_stats"]["action_forward"][epi_idx][agent], dtype=np.int32)
if len(forward_reject) == 0:
# If it is zero, we have an asymmetric environment, this metric
# is useless for this agent.
forward_reject_epi[agent] = 0
else:
forward_reject_epi[agent] = forward_reject * 100 / action_forward
forward_reject_epi[agent] = np.nan_to_num(forward_reject_epi[agent], posinf=0.0, neginf=0.0)
local_iter.append(local_epi)
reject_iter.append(reject_epi)
forward_reject_iter.append(forward_reject_epi)
metrics["local_reqs_percent_excess_per_step"] = local_iter
metrics["reject_reqs_percent_excess_per_step"] = reject_iter
metrics["forward_reject_reqs_percent_per_step"] = forward_reject_iter
# Reset the NumPy error handling.
np.seterr(invalid=old["invalid"])
def average_reqs_percent_exceed_per_episode(eval, metrics):
"""For each episode, for each iteration, calculates the average percentage
of rejected requests that could have been handled locally but were rejected
by the agent, and the same percentage but for local requests that exceed the
local buffer."""
agents = get_agents()
local_iter, reject_iter, forward_reject_iter = [], [], []
for epi_idx in range(num_episodes(eval)):
local_epi, reject_epi, forward_reject_epi = {}, {}, {}
for agent in agents:
tmp = metrics["local_reqs_percent_excess_per_step"][epi_idx][agent]
local_epi[agent] = np.average(tmp)
tmp = metrics["reject_reqs_percent_excess_per_step"][epi_idx][agent]
reject_epi[agent] = np.average(tmp)
tmp = metrics["forward_reject_reqs_percent_per_step"][epi_idx][agent]
forward_reject_epi[agent] = np.average(tmp)
local_iter.append(local_epi)
reject_iter.append(reject_epi)
forward_reject_iter.append(forward_reject_epi)
metrics["local_reqs_percent_excess_per_episode"] = local_iter
metrics["reject_reqs_percent_excess_per_episode"] = reject_iter
metrics["forward_reject_reqs_percent_per_episode"] = forward_reject_iter
def reject_reqs_excess_per_step(eval, metrics):
"""For each step, for each episode, calculates the absolute number of
rejected requests that could have been handled locally but were rejected by
the agent.
TODO: This function should not exist, the environment should provide this
value."""
agents = get_agents()
metrics["reject_excess_per_step"] = []
for epi_idx in range(num_episodes(eval)):
excess_epi = {}
for agent in agents:
tmp = calc_excess_reject(eval["hist_stats"], agent, epi_idx)
excess_epi[agent] = tmp
metrics["reject_excess_per_step"].append(excess_epi)
def main(exp_dir):
exp_dir = dfaas_utils.to_pathlib(exp_dir)
eval = dfaas_utils.parse_result_file(exp_dir / "final_evaluation.json")[0]["evaluation"]
# Each key in the metrics dictionary is a calculated metric. Note that the
# value can be a subdictionary or a sublist.
metrics = {}
average_reward_per_step(eval, metrics)
reqs_exceed_per_step(eval, metrics)
average_reqs_percent_exceed_per_episode(eval, metrics)
reject_reqs_excess_per_step(eval, metrics)
# Save the metrics dictionary to disk as a JSON file.
metrics_path = exp_dir / "metrics-final_evaluation.json"
dfaas_utils.dict_to_json(metrics, metrics_path)
logger.info(f"Metrics data saved to: {metrics_path.as_posix()!r}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="metrics-eval")
parser.add_argument(dest="exp_directory",
help="Directory with the final_evaluation.json file")
args = parser.parse_args()
main(args.exp_directory)