Skip to content

Commit

Permalink
Issue #67: Create GroundStationState
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark2000 committed Sep 20, 2023
1 parent 4723af4 commit 5bb7db4
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 4 deletions.
85 changes: 85 additions & 0 deletions bsk_rl/envs/general_satellite_tasking/scenario/sat_observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from Basilisk.utilities import orbitalMotion

from bsk_rl.envs.general_satellite_tasking.scenario.satellites import (
AccessSatellite,
ImagingSatellite,
Satellite,
)
Expand Down Expand Up @@ -250,3 +251,87 @@ def eclipse_state(self):
(eclipse_start - self.simulator.sim_time) / self.orbit_period_eclipse_norm,
(eclipse_end - self.simulator.sim_time) / self.orbit_period_eclipse_norm,
]


@configurable
class GroundStationState(SatObservation, AccessSatellite):
def __init__(
self,
*args,
n_ahead_observe_downlinks: int = 1,
downlink_window_properties: Optional[list[dict[str, Any]]] = None,
**kwargs,
):
"""Adds information about upcoming downlink opportunities to the observation
state.
Args:
n_ahead_observe: Number of upcoming downlink opportunities to consider.
target_properties: List of properties to include in the observation in the
format [dict(prop="prop_name", norm=norm)]. If norm is not specified, it
is set to 1.0 (no normalization). Properties to choose from:
- location
- window_open
- window_mid
- window_close
"""
super().__init__(*args, **kwargs)
if downlink_window_properties is None:
downlink_window_properties = [
dict(prop="window_open", norm=5700),
dict(prop="window_close", norm=5700),
]
self.ground_station_obs_generator(
downlink_window_properties, n_ahead_observe_downlinks
)

def reset_post_sim(self) -> None:
"""Add downlink ground stations to be considered by the access checker"""
for ground_station in self.simulator.environment.groundStations:
self.add_location_for_access_checking(
object=ground_station.ModelTag,
location=np.array(ground_station.r_LP_P_Init).flatten(),
min_elev=ground_station.minimumElevation,
type="ground_station",
)
super().reset_post_sim()

def ground_station_obs_generator(
self,
downlink_window_properties: list[dict[str, Any]],
n_ahead_observe_downlinks: int,
):
"""Generate the ground_station_obs function from the downlink_window_properties
spec and add it to the observation.
"""

def ground_station_obs(self):
obs = {}
for i, opportunity in enumerate(
self.find_next_opportunities(
n=n_ahead_observe_downlinks, types="ground_station"
)
):
props = {}
for prop_spec in downlink_window_properties:
name = prop_spec["prop"]
norm = prop_spec.get("norm", 1.0)
if name == "location":
value = opportunity["location"]
elif name == "window_open":
value = opportunity["window"][0] - self.simulator.sim_time
elif name == "window_mid":
value = sum(opportunity["window"]) / 2 - self.simulator.sim_time
elif name == "window_close":
value = opportunity["window"][1] - self.simulator.sim_time
else:
raise ValueError(
f"Invalid ground station property: {prop_spec['prop']}"
)
if norm != 1.0:
name += "_normd"
props[name] = value / norm
obs[f"ground_station_{i}"] = props
return obs

self.add_to_observation(bind(self, ground_station_obs, "ground_station_obs"))
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,32 @@ def test_eclipse_state(self):
observation2, reward, terminated, truncated, info = self.env.step(0)
assert (observation2[0] - observation1[0]) < 0.05
assert (observation2[1] - observation1[1]) < 0.05
assert (observation2[1] - observation1[1]) < 0.05


class TestGroundStationState:
class GroundSat(
sa.DriftAction, so.GroundStationState.configure(n_ahead_observe_downlinks=2)
):
dyn_type = dynamics.GroundStationDynModel
fsw_type = fsw.ImagingFSWModel

env = gym.make(
"SingleSatelliteTasking-v1",
satellites=GroundSat(
"Satellite",
obs_type=list,
sat_args=GroundSat.default_sat_args(oe=random_orbit()),
),
env_type=environment.GroundStationEnvModel,
env_args=environment.GroundStationEnvModel.default_env_args(),
env_features=StaticTargets(n_targets=0),
data_manager=data.NoDataManager(),
sim_rate=1.0,
max_step_duration=5700.0,
time_limit=5700.0,
disable_env_checker=True,
)

def test_ground_station_state(self):
observation, info = self.env.reset()
assert sum(observation) > 0 # Check that there are downlink opportunities
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def test_target_state_normed(self, sat_init):
target_properties=[
dict(prop="priority"),
dict(prop="location", norm=10.0),
dict(prop="window_open"),
dict(prop="window_open", norm=10.0),
dict(prop="window_mid"),
dict(prop="window_close"),
],
Expand All @@ -168,14 +168,14 @@ def test_target_state_normed(self, sat_init):
target_0=dict(
priority=0.0,
location_normd=np.array([0.0, 0.1, 0.0]),
window_open=5.0,
window_open_normd=5.0 / 10.0,
window_mid=10.0,
window_close=15.0,
),
target_1=dict(
priority=1.0,
location_normd=np.array([0.1, 0.1, 0.0]),
window_open=5.0,
window_open_normd=5.0 / 10.0,
window_mid=10.0,
window_close=15.0,
),
Expand Down Expand Up @@ -219,6 +219,103 @@ def test_eclipse_state(self, sat_init):
assert sat.eclipse_state() == [0.1, 0.2]


@patch.multiple(so.GroundStationState, __abstractmethods__=set())
@patch(
"bsk_rl.envs.general_satellite_tasking.scenario.satellites.AccessSatellite.__init__"
)
class TestGroundStationState:
def test_init(self, sat_init):
sat = so.GroundStationState()
sat_init.assert_called_once()
assert len(sat.obs_fn_list) == 1
assert sat.obs_fn_list[0].__name__ == "ground_station_obs"

def test_ground_station_state(self, sat_init):
n_ahead = 2
sat = so.GroundStationState(n_ahead_observe_downlinks=n_ahead)
sat.simulator = MagicMock(sim_time=0.0)
sat.find_next_opportunities = MagicMock(
return_value=[
dict(
ground_station=f"Boulder_{i}",
location=np.array([0.0, 0.0, 0.0]),
window=(i + 0.0, i + 1.0),
)
for i in range(n_ahead)
]
)
expected = dict(
ground_station_0=dict(window_open_normd=0.0, window_close_normd=1.0 / 5700),
ground_station_1=dict(
window_open_normd=1.0 / 5700, window_close_normd=2.0 / 5700
),
)
for k1, v1 in sat.ground_station_obs().items():
for k2, v2 in v1.items():
assert np.all(v2 == expected[k1][k2])

def test_target_state_normed(self, sat_init):
n_ahead = 2
sat = so.GroundStationState(
n_ahead_observe_downlinks=n_ahead,
downlink_window_properties=[
dict(prop="location", norm=10.0),
dict(prop="window_open", norm=10.0),
dict(prop="window_mid"),
dict(prop="window_close"),
],
)

sat.opportunities = [
dict(
ground_station="Boulder",
window=(10.0, 20.0),
type="ground_station",
location=np.array([0.0, 0.0, 0.0]),
),
dict(
ground_station="Pleasanton",
window=(20.0, 30.0),
type="ground_station",
location=np.array([1.0, 1.0, 1.0]),
),
]
sat.simulator = MagicMock(sim_time=5.0)

expected = dict(
ground_station_0=dict(
location_normd=np.array([0.0, 0.0, 0.0]),
window_open_normd=5.0 / 10.0,
window_mid=10.0,
window_close=15.0,
),
ground_station_1=dict(
location_normd=np.array([0.1, 0.1, 0.1]),
window_open_normd=15.0 / 10.0,
window_mid=20.0,
window_close=25.0,
),
)
for k1, v1 in sat.ground_station_obs().items():
for k2, v2 in v1.items():
print(v2, expected[k1][k2])
assert np.all(v2 == expected[k1][k2])

def test_bad_ground_station_state(self, sat_init):
n_ahead = 2
sat = so.GroundStationState(
n_ahead_observe_downlinks=n_ahead,
downlink_window_properties=[
dict(prop="not_a_prop"),
],
)
sat.find_next_opportunities = MagicMock(
return_value=[dict(ground_station=MagicMock()) for i in range(n_ahead)]
)
with pytest.raises(ValueError):
sat.ground_station_obs()


@patch.multiple(so.NormdPropertyState, __abstractmethods__=set())
@patch.multiple(so.TimeState, __abstractmethods__=set())
@patch.multiple(so.TargetState, __abstractmethods__=set())
Expand Down

0 comments on commit 5bb7db4

Please sign in to comment.