Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #67: Refactor window calculations for generality #68

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 101 additions & 10 deletions bsk_rl/envs/general_satellite_tasking/scenario/sat_observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from Basilisk.utilities import orbitalMotion

from bsk_rl.envs.general_satellite_tasking.scenario.satellites import (
AccessSatellite,
ImagingSatellite,
Satellite,
)
Expand Down Expand Up @@ -117,6 +118,8 @@ def prop_fn(self):
for module in ["dynamics", "fsw"]:
if hasattr(getattr(self, module), prop):
return np.array(getattr(getattr(self, module), prop)) / norm
else:
raise AttributeError(f"Property {prop} not found")

prop_fn.__name__ = prop
if norm != 1:
Expand Down Expand Up @@ -182,7 +185,7 @@ def __init__(
if "location_norm" in kwargs:
warn(
"location_norm is ignored and should be specified in target_properties"
)
) # pragma: no cover
self.n_ahead_observe = int(n_ahead_observe)
self.target_obs_generator(target_properties)

Expand All @@ -193,30 +196,34 @@ def target_obs_generator(self, target_properties):

def target_obs(self):
obs = {}
for i, target in enumerate(self.upcoming_targets(self.n_ahead_observe)):
for i, opportunity in enumerate(
self.find_next_opportunities(
n=self.n_ahead_observe,
filter=self._get_imaged_filter(),
types="target",
)
):
props = {}
for prop_spec in target_properties:
name = prop_spec["prop"]
norm = prop_spec.get("norm", 1.0)
if name == "priority":
value = target.priority / norm
value = opportunity["target"].priority
elif name == "location":
value = target.location / norm
value = opportunity["target"].location
elif name == "window_open":
value = self.next_windows[target][0] - self.simulator.sim_time
value = opportunity["window"][0] - self.simulator.sim_time
elif name == "window_mid":
value = (
sum(self.next_windows[target]) / 2 - self.simulator.sim_time
)
value = sum(opportunity["window"]) / 2 - self.simulator.sim_time
elif name == "window_close":
value = self.next_windows[target][1] - self.simulator.sim_time
value = opportunity["window"][1] - self.simulator.sim_time
else:
raise ValueError(
f"Invalid target property: {prop_spec['prop']}"
)
if norm != 1.0:
name += "_normd"
props[name] = value
props[name] = value / norm
obs[f"target_{i}"] = props
return obs

Expand Down Expand Up @@ -244,3 +251,87 @@ def eclipse_state(self):
(eclipse_start - self.simulator.sim_time) / self.orbit_period_eclipse_norm,
(eclipse_end - self.simulator.sim_time) / self.orbit_period_eclipse_norm,
]


@configurable
class GroundStationState(SatObservation, AccessSatellite):
def __init__(
self,
*args,
n_ahead_observe_downlinks: int = 1,
downlink_window_properties: Optional[list[dict[str, Any]]] = None,
**kwargs,
):
"""Adds information about upcoming downlink opportunities to the observation
state.

Args:
n_ahead_observe: Number of upcoming downlink opportunities to consider.
target_properties: List of properties to include in the observation in the
format [dict(prop="prop_name", norm=norm)]. If norm is not specified, it
is set to 1.0 (no normalization). Properties to choose from:
- location
- window_open
- window_mid
- window_close
"""
super().__init__(*args, **kwargs)
if downlink_window_properties is None:
downlink_window_properties = [
dict(prop="window_open", norm=5700),
dict(prop="window_close", norm=5700),
]
self.ground_station_obs_generator(
downlink_window_properties, n_ahead_observe_downlinks
)

def reset_post_sim(self) -> None:
"""Add downlink ground stations to be considered by the access checker"""
for ground_station in self.simulator.environment.groundStations:
self.add_location_for_access_checking(
object=ground_station.ModelTag,
location=np.array(ground_station.r_LP_P_Init).flatten(),
min_elev=ground_station.minimumElevation,
type="ground_station",
)
super().reset_post_sim()

def ground_station_obs_generator(
self,
downlink_window_properties: list[dict[str, Any]],
n_ahead_observe_downlinks: int,
):
"""Generate the ground_station_obs function from the downlink_window_properties
spec and add it to the observation.
"""

def ground_station_obs(self):
obs = {}
for i, opportunity in enumerate(
self.find_next_opportunities(
n=n_ahead_observe_downlinks, types="ground_station"
)
):
props = {}
for prop_spec in downlink_window_properties:
name = prop_spec["prop"]
norm = prop_spec.get("norm", 1.0)
if name == "location":
value = opportunity["location"]
elif name == "window_open":
value = opportunity["window"][0] - self.simulator.sim_time
elif name == "window_mid":
value = sum(opportunity["window"]) / 2 - self.simulator.sim_time
elif name == "window_close":
value = opportunity["window"][1] - self.simulator.sim_time
else:
raise ValueError(
f"Invalid ground station property: {prop_spec['prop']}"
)
if norm != 1.0:
name += "_normd"
props[name] = value / norm
obs[f"ground_station_{i}"] = props
return obs

self.add_to_observation(bind(self, ground_station_obs, "ground_station_obs"))
Loading