-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathpaseos.py
420 lines (342 loc) · 15.7 KB
/
paseos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
import types
import asyncio
import sys
from dotmap import DotMap
from loguru import logger
import pykep as pk
from skspatial.objects import Sphere
from paseos.actors.base_actor import BaseActor
from paseos.activities.activity_manager import ActivityManager
from paseos.utils.operations_monitor import OperationsMonitor
class PASEOS:
"""This class serves as the main interface with the user."""
# Config file of the simulation
_cfg = None
# Stores the simulation state
_state = None
# Storing actors we know as dictionary for easy access
# Does not include local actor.
_known_actors = None
# The actor of the device this is running on
_local_actor = None
# TODO replace this in the future depending on central body
_central_body_sphere = None
# Handles registered activities
_activity_manager = None
# Semaphore to track if an activity is currently running
_is_running_activity = False
# Semaphore to track if we are currently running "advance_time"
_is_advancing_time = False
# Used to monitor the local actor over execution and write performance stats
_operations_monitor = None
_time_since_previous_log = sys.float_info.max
def __init__(self, local_actor: BaseActor, cfg=None):
"""Initalize PASEOS
Args:
local_actor (BaseActor): local actor.
cfg (DotMap, optional): simulation configuration. Defaults to None.
"""
logger.trace("Initializing PASEOS")
self._cfg = cfg
self._central_body_sphere = Sphere([0, 0, 0], cfg.comm.central_body_LOS_radius)
self._state = DotMap(_dynamic=False)
self._state.time = self._cfg.sim.start_time
self._known_actors = {}
self._local_actor = local_actor
# Update local actor time to simulation start time.
self.local_actor.set_time(pk.epoch(self._cfg.sim.start_time * pk.SEC2DAY))
# Set line of sight blocking sphere
self.local_actor.set_central_body_shape(self._central_body_sphere)
self._activity_manager = ActivityManager(
self, self._cfg.sim.activity_timestep, self._cfg.sim.time_multiplier
)
self._operations_monitor = OperationsMonitor(self.local_actor.name)
async def wait_for_activity(self):
"""This functions allows waiting for the currently running activity to finish."""
while self._is_running_activity is True:
await asyncio.sleep(0.1)
def save_status_log_csv(self, filename) -> None:
"""Saves the status log incl. all kinds of information such as battery charge,
running activtiy, etc.
Args:
filename (str): File to save the log in.
"""
self._operations_monitor.save_to_csv(filename)
def log_status(self):
"""Updates the status log."""
self._operations_monitor.log(self.local_actor, self.known_actor_names)
def advance_time(
self,
time_to_advance: float,
current_power_consumption_in_W: float,
constraint_function: types.FunctionType = None,
):
"""Advances the simulation by a specified amount of time
Args:
time_to_advance (float): Time to advance in seconds.
current_power_consumption_in_W (float): Current power consumed per second in Watt.
constraint_function(FunctionType): Constraint function which will be evaluated
every cfg.sim.activity_timestep seconds. Aborts the advancement if False.
Returns:
float: Time remaining to advance (or 0 if done)
"""
assert (
not self._is_advancing_time
), "advance_time is already running. This function is not thread-safe. Avoid mixing (async) activities and calling it."
self._is_advancing_time = True
assert time_to_advance > 0, "Time to advance has to be positive."
assert current_power_consumption_in_W >= 0, "Power consumption cannot be negative."
# Check constraint function returns something
if constraint_function is not None:
assert (
constraint_function() is not None
), "Your constraint function failed to return True or False."
logger.debug("Advancing time by " + str(time_to_advance) + " s.")
target_time = self._state.time + time_to_advance
dt = self._cfg.sim.dt
time_since_constraint_check = float("inf")
# Perform timesteps until target_time - dt reached,
# then final smaller or equal timestep to reach target_time
while self._state.time < target_time:
# Check constraint function
if (
constraint_function is not None
and time_since_constraint_check > self._cfg.sim.activity_timestep
):
time_since_constraint_check = 0
if not constraint_function():
logger.info("Time advancing interrupted. Constraint false.")
break
if self._state.time > target_time - dt:
# compute final timestep to catch up
dt = target_time - self._state.time
logger.trace(f"Time {self._state.time}, advancing {dt}")
# Perform updates for local actor (e.g. charging)
# Each actor only updates itself
# check for device and / or activity failure
if self.local_actor.has_radiation_model:
if self.local_actor.is_dead:
logger.warning(f"Tried to advance time on dead actor {self.local_actor}.")
return max(target_time - self._state.time, 0)
if self.local_actor._radiation_model.did_device_restart(dt):
logger.info(f"Actor {self.local_actor} interrupted during advance_time.")
self.local_actor.set_was_interrupted()
return max(target_time - self._state.time, 0)
if self.local_actor._radiation_model.did_device_experience_failure(dt):
logger.info(f"Actor {self.local_actor} died during advance_time.")
self.local_actor.set_is_dead()
return max(target_time - self._state.time, 0)
# charge from current moment to time after timestep
if self.local_actor.has_power_model:
self._local_actor.charge(dt)
# Update actor temperature
if self.local_actor.has_thermal_model:
self.local_actor._thermal_model.update_temperature(
dt, current_power_consumption_in_W
)
# Update state of charge
if self.local_actor.has_power_model:
self.local_actor.discharge(current_power_consumption_in_W, dt)
# Update user-defined properties in the actor
for property_name in self.local_actor.custom_properties.keys():
update_function = self.local_actor.get_custom_property_update_function(
property_name
)
new_value = update_function(self.local_actor, dt, current_power_consumption_in_W)
self.local_actor.set_custom_property(property_name, new_value)
self._state.time += dt
time_since_constraint_check += dt
self.local_actor.set_time(pk.epoch(self._state.time * pk.SEC2DAY))
# Check if we should update the status log
if self._time_since_previous_log > self._cfg.io.logging_interval:
self.log_status()
self._time_since_previous_log = 0
else:
self._time_since_previous_log += dt
logger.debug("New time is: " + str(self._state.time) + " s.")
self._is_advancing_time = False
return max(target_time - self._state.time, 0)
def add_known_actor(self, actor: BaseActor):
"""Adds an actor to the simulation.
Args:
actor (BaseActor): Actor to add
"""
logger.debug("Adding actor:" + str(actor))
logger.debug("Current actors: " + str(self._known_actors.keys()))
# Check for duplicate actors by name
if actor.name in self._known_actors.keys():
raise ValueError("Trying to add already existing actor with name: " + actor.name)
# Else add
self._known_actors[actor.name] = actor
def model_data_corruption(self, data_shape: list, exposure_period_in_s: float):
"""Computes a boolean mask for each data element that has been corrupted.
Args:
data_shape (list): Shape of the data to corrupt.
exposure_period_in_s (float): Period of radiation exposure.
Returns:
np.array: Boolean mask which is True if an entry was corrupted.
"""
if not self.local_actor.has_radiation_model:
raise ValueError(
f"Actor {self.local_actor} has no radiation model. Set on up with ActorBuilder"
+ ".set_radiation_model() first to be able to corrupt data."
)
assert exposure_period_in_s > 0, "Exposure period must be positive."
return self.local_actor._radiation_model.model_data_corruption(
data_shape=data_shape, exposure_period_in_s=exposure_period_in_s
)
@property
def simulation_time(self) -> float:
"""Get the current simulation time of this paseos instance in seconds since start.
Returns:
float: Time since start in seconds.
"""
return self._state.time
@property
def local_time(self) -> pk.epoch:
"""Returns local time of the actor as pykep epoch. Use e.g. epoch.mjd2000 to get time in days.
Returns:
pk.epoch: local time of the actor
"""
return self.local_actor.local_time
@property
def monitor(self):
"""Access paseos operations monitor which tracks local actor attributes such as temperature or state of charge.
Returns:
OperationsMonitor: Monitor object.
"""
return self._operations_monitor
@property
def is_running_activity(self):
"""Allows checking whether there is currently an activity running.
Returns:
bool: Yes if running an activity.
"""
return self._is_running_activity
@property
def local_actor(self) -> BaseActor:
"""Returns the local actor.
Returns:
BaseActor: Local actor
"""
return self._local_actor
@property
def known_actors(self) -> dict:
"""Returns known actors.
Returns:
dict of BaseActor: Dictionary of the known actors.
"""
return self._known_actors
@property
def known_actor_names(self) -> list:
"""Returns names of known actors.
Returns:
list: List of names of known actors.
"""
return self._known_actors.keys()
def empty_known_actors(self):
"""Clears the list of known actors."""
self._known_actors = {}
def remove_known_actor(self, actor_name: str):
"""Remove an actor from the list of known actors.
Args:
actor_name (str): name of the actor to remove.
"""
assert (
actor_name in self.known_actors
), f"Actor {actor_name} is not in known. Available are {self.known_actors.keys()}"
del self._known_actors[actor_name]
def remove_activity(self, name: str):
"""Removes a registered activity
Args:
name (str): Name of the activity.
"""
self._activity_manager.remove_activity(name=name)
def register_activity(
self,
name: str,
activity_function: types.CoroutineType,
power_consumption_in_watt: float,
on_termination_function: types.CoroutineType = None,
constraint_function: types.CoroutineType = None,
):
"""Registers an activity that can then be performed on the local actor.
Args:
name (str): Name of the activity.
activity_function (types.CoroutineType): Function to execute during the activity.
Needs to be async. Can accept a list of arguments to be specified later.
power_consumption_in_watt (float): Power consumption of the activity in W (per second).
on_termination_function (types.CoroutineType): Function to execute when the activities stops
(either due to completion or constraint not being satisfied anymore). Needs to be async.
Can accept a list of arguments to be specified later.
constraint_function (types.CoroutineType): Function to evaluate if constraints are still valid.
Should return True if constraints are valid, False if they aren't. Needs to be async.
Can accept a list of arguments to be specified later.
"""
# Check provided functions are coroutines
error = (
"The activity function needs to be a coroutine."
"For more information see https://docs.python.org/3/library/asyncio-task.html"
)
assert asyncio.iscoroutinefunction(activity_function), error
if constraint_function is not None:
error = (
"The constraint function needs to be a coroutine."
"For more information see https://docs.python.org/3/library/asyncio-task.html"
)
assert asyncio.iscoroutinefunction(constraint_function), error
if on_termination_function is not None:
error = (
"The on_termination function needs to be a coroutine."
"For more information see https://docs.python.org/3/library/asyncio-task.html"
)
assert asyncio.iscoroutinefunction(on_termination_function), error
self._activity_manager.register_activity(
name=name,
activity_function=activity_function,
power_consumption_in_watt=power_consumption_in_watt,
on_termination_function=on_termination_function,
constraint_function=constraint_function,
)
def perform_activity(
self,
name: str,
activity_func_args: list = None,
termination_func_args: list = None,
constraint_func_args: list = None,
):
"""Perform the specified activity. Will advance the simulation if automatic clock is not disabled.
Args:
name (str): Name of the activity
power_consumption_in_watt (float, optional): Power consumption of the
activity in seconds if not specified. Defaults to None.
duration_in_s (float, optional): Time to perform this activity. Defaults to 1.0.
Returns:
bool: Whether the activity was performed successfully.
"""
if self._is_running_activity:
raise RuntimeError(
"PASEOS is already running an activity. Please wait for it to finish. "
+ "To perform activities in parallen encasulate them in one, single joint activity."
)
else:
self._is_running_activity = True
return self._activity_manager.perform_activity(
name=name,
activity_func_args=activity_func_args,
termination_func_args=termination_func_args,
constraint_func_args=constraint_func_args,
)
def set_central_body(self, planet: pk.planet):
"""Sets the central body of the simulation for the orbit simulation
Args:
planet (pk.planet): The central body as a pykep planet
"""
logger.debug("Setting central body to " + planet)
self._state.central_body = planet
def get_cfg(self) -> DotMap:
"""Returns the current cfg of the simulation
Returns:
DotMap: cfg
"""
return self._cfg