Skip to content

Commit

Permalink
Issue #45: Added functions for Continuous Nadir Scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
LorenzzoQM committed Sep 15, 2023
1 parent 65755e8 commit 8487b4b
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 2 deletions.
89 changes: 89 additions & 0 deletions bsk_rl/envs/general_satellite_tasking/scenario/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,92 @@ def _calc_reward(self, new_data_dict: dict[str, UniqueImageData]) -> float:
# reward += reward - self.data.rewards[target]
# self.data += new_data
# return reward

################
# Nadir Pointing#
################


class NadirScanningTimeData(DataType):
"""Base class for units of satellite data"""

def __init__(self, scanning_time_data: float = 0.0) -> None:
"""DataType to log data generated scanning nadir
Args:
nadirTime: Time spent pointing nadir
"""
self.scanning_time = scanning_time_data

def __add__(self, other: "NadirScanningTimeData") -> "NadirScanningTimeData":
"""Define the combination of two units of data"""
scanning_time = self.scanning_time + other.scanning_time

return self.__class__(scanning_time)


class ScanningNadirTimeStore(DataStore):
DataType = NadirScanningTimeData

def _get_log_state(self) -> LogStateType:
"""Returns the time spent scanning nadir based on the amount of data stored
and the instrument baud rate."""

storage_unit = self.satellite.dynamics.storageUnit.storageUnitDataOutMsg.read()
stored_amount = storage_unit.storageLevel
instrument_baudrate = self.satellite.dynamics.instrument.nodeBaudRate
print("instrument_baudrate: ", instrument_baudrate)
time_scanning = stored_amount / instrument_baudrate

# return amount of data stored
return time_scanning

def _compare_log_states(
self, amount_time_old: float, amount_time_new: float
) -> "NadirScanningTimeData":
"""Generate a unit of data based on previous step and current step logs
Args:
amount_time_old: Amount of time spent scanning nadir previously
amount_time_new: Amount of time spent scanning nadir
Returns:
DataType: Data generated
"""

data_generated = amount_time_new - amount_time_old

return NadirScanningTimeData(data_generated)


class NadirScanningManager(DataManager):
DataStore = ScanningNadirTimeStore # type of DataStore managed by the DataManager

def __init__(
self,
env_features: Optional["EnvironmentFeatures"] = None,
reward_fnc: Callable = lambda p: p,
) -> None:
"""
Args:
env_features: Information about the environment that can be collected as
data
rewardFnc: Reward as function of time spend pointing nadir
"""
super().__init__(env_features)
self.reward_fnc = reward_fnc

def _calc_reward(self, new_scanning_times: ["NadirScanningTimeData"]) -> float:
"""Calculate step reward based on all satellite data from a step
Args:
newTimeData: Satellite-DataType of new data from a step
Returns:
Step reward
"""
reward = 0.0
for scanning_time in new_scanning_times.values():
reward += self.reward_fnc(scanning_time.scanning_time)

return reward
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,15 @@ def regenerate_targets(self) -> None:
priority=self.priority_distribution(),
)
)


class NadirTarget(EnvironmentFeatures):
"""
Defines a nadir target center at the center of the planet.
"""

def __init__(self) -> None:
self.name = "nadir"
self.location = [0, 0, 0]
self.priority = 0
self.targets = [Target(self.name, self.location, self.priority)]
19 changes: 19 additions & 0 deletions bsk_rl/envs/general_satellite_tasking/scenario/sat_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,22 @@ def set_action(self, action: Union[int, Target, str]):
self.prev_action_key = self.image(action, self.prev_action_key)
else:
super().set_action(action)


class NadirImagingActions(DiscreteSatAction, ImagingSatellite):
def __init__(self, *args, **kwargs) -> None:
"""
Discrete action to image nadir.
"""
super().__init__(*args, **kwargs)

# Sets a static Nadir target
self.targets = Target(name="nadir", location=np.array([0, 0, 0.1]), priority=0)
self.add_action(self.nadirImage, n_actions=1, act_name="nadirImage")

def nadirImage(self, *args, prev_action_key=None, **kwargs) -> None:
"""Activate Nadir imaging action."""

if prev_action_key != "nadir_image":
self.task_target_for_imaging(self.targets, nadir_pointing=True)
return "nadir_image"
6 changes: 4 additions & 2 deletions bsk_rl/envs/general_satellite_tasking/scenario/satellites.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,16 +600,18 @@ def enable_target_window(self, target: Target):
extra_actions=[self._satellite_command + ".missed += 1"],
)

def task_target_for_imaging(self, target: Target):
def task_target_for_imaging(self, target: Target, nadir_pointing=False):
"""Task the satellite to image a target
Args:
target: Selected target
nadir_pointing: If false, enable the target window
"""
msg = f"{target} tasked for imaging"
self.log_info(msg)
self.fsw.action_image(target.location, target.id)
self.enable_target_window(target)
if not nadir_pointing:
self.enable_target_window(target)


#########################
Expand Down
101 changes: 101 additions & 0 deletions bsk_rl/envs/general_satellite_tasking/simulation/dynamics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
simpleNav,
simplePowerSink,
simpleSolarPanel,
simpleStorageUnit,
spacecraft,
spacecraftLocation,
spaceToGroundTransmitter,
Expand Down Expand Up @@ -780,6 +781,106 @@ def reset_for_action(self) -> None:
self.instrumentPowerSink.powerStatus = 0


class ContinuousImagingDynModel(ImagingDynModel):
"""Equips the satellite with an instrument, storage unit, and transmitter
for continuous nadir imaging."""

@default_args(instrumentBaudRate=8e6)
def _set_instrument(
self, instrumentBaudRate: float, priority: int = 895, **kwargs
) -> None:
"""Create the continuous instrument model.
Args:
instrumentBaudRate: Data generated in step by continuous imaging [bits]
priority: Model priority.
"""
self.instrument = simpleInstrument.SimpleInstrument()
self.instrument.ModelTag = "instrument" + self.satellite.id
self.instrument.nodeBaudRate = instrumentBaudRate # make imaging instantaneous
self.instrument.nodeDataName = "Instrument" + self.satellite.id
self.simulator.AddModelToTask(
self.task_name, self.instrument, ModelPriority=priority
)

@default_args(dataStorageCapacity=20 * 8e6, bufferNames=None)
def _set_storage_unit(
self,
dataStorageCapacity: int,
transmitterNumBuffers: Optional[int] = None,
bufferNames: Optional[Iterable[str]] = None,
priority: int = 699,
**kwargs,
) -> None:
"""Configure the storage unit and its buffers.
Args:
dataStorageCapacity: Maximum data to be stored [bits]
transmitterNumBuffers: Number of unit buffers. Not necessary if bufferNames
given.
bufferNames: List of buffer names to use. Named by number if None.
priority: Model priority.
"""
self.storageUnit = simpleStorageUnit.SimpleStorageUnit()
self.storageUnit.ModelTag = "storageUnit" + self.satellite.id
self.storageUnit.storageCapacity = dataStorageCapacity # bits
self.storageUnit.addDataNodeToModel(self.instrument.nodeDataOutMsg)
self.storageUnit.addDataNodeToModel(self.transmitter.nodeDataOutMsg)

# Add the storage unit to the transmitter
self.transmitter.addStorageUnitToTransmitter(
self.storageUnit.storageUnitDataOutMsg
)

self.simulator.AddModelToTask(
self.task_name, self.storageUnit, ModelPriority=priority
)

@default_args(
groundLocationPlanetRadius=orbitalMotion.REQ_EARTH * 1e3,
imageTargetMinimumElevation=np.radians(0),
imageTargetMaximumRange=-1,
)
def _set_imaging_target(
self,
groundLocationPlanetRadius: float,
imageTargetMinimumElevation: float = np.radians(-89),
imageTargetMaximumRange: float = -1,
priority: int = 2000,
**kwargs,
) -> None:
"""Add a generic imaging target to dynamics. The target must be updated with a
particular location when used.
Args:
groundLocationPlanetRadius: Radius of ground locations from center of planet
[m]
imageTargetMinimumElevation: Minimum elevation angle from target to
satellite when imaging [rad]
imageTargetMaximumRange: Maximum range from target to satellite when
imaging. -1 to disable. [m]
priority: Model priority.
"""
self.imagingTarget = groundLocation.GroundLocation()
self.imagingTarget.ModelTag = "scanningTarget"
self.imagingTarget.planetRadius = 0.00001
self.imagingTarget.specifyLocation(0, 0, 0)
self.imagingTarget.planetInMsg.subscribeTo(
self.environment.gravFactory.spiceObject.planetStateOutMsgs[
self.environment.body_index
]
)
self.imagingTarget.minimumElevation = np.radians(-89)
self.imagingTarget.maximumRange = -1

self.simulator.AddModelToTask(
self.environment.env_task_name,
self.imagingTarget,
ModelPriority=priority,
)
self.imagingTarget.addSpacecraftToModel(self.scObject.scStateOutMsg)


class GroundStationDynModel(ImagingDynModel):
"""Model that connects satellite to environment ground stations"""

Expand Down
Loading

0 comments on commit 8487b4b

Please sign in to comment.