Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#45 nadir pointing science mode #62

Merged
merged 2 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions bsk_rl/envs/general_satellite_tasking/scenario/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,98 @@ def _calc_reward(self, new_data_dict: dict[str, UniqueImageData]) -> float:
# reward += reward - self.data.rewards[target]
# self.data += new_data
# return reward

#################
# Nadir Pointing#
#################


class NadirScanningTimeData(DataType):
def __init__(self, scanning_time: float = 0.0) -> None:
"""DataType to log data generated scanning nadir

Args:
scanning_time: Time spent scanning nadir
"""
self.scanning_time = scanning_time

def __add__(self, other: "NadirScanningTimeData") -> "NadirScanningTimeData":
"""Define the combination of two units of data"""
scanning_time = self.scanning_time + other.scanning_time

return self.__class__(scanning_time)


class ScanningNadirTimeStore(DataStore):
DataType = NadirScanningTimeData

def _get_log_state(self) -> LogStateType:
"""Returns the amount of data stored in the storage unit."""

storage_unit = self.satellite.dynamics.storageUnit.storageUnitDataOutMsg.read()
stored_amount = storage_unit.storageLevel

# return amount of data stored
return stored_amount

def _compare_log_states(
Mark2000 marked this conversation as resolved.
Show resolved Hide resolved
self, old_state: float, new_state: float
) -> "NadirScanningTimeData":
"""Generate a unit of data based on previous step and current step stored
data amount.

Args:
old_state: Previous amount of data in the storage unit
new_state: Current amount of data in the storage unit

Returns:
DataType: Data generated
"""
instrument_baudrate = self.satellite.dynamics.instrument.nodeBaudRate

if new_state > old_state:
data_generated = (new_state - old_state) / instrument_baudrate
else:
data_generated = 0.0

return NadirScanningTimeData(data_generated)


class NadirScanningManager(DataManager):
DataStore = ScanningNadirTimeStore # type of DataStore managed by the DataManager

def __init__(
self,
env_features: Optional["EnvironmentFeatures"] = None,
reward_fn: Callable = None,
) -> None:
"""
Args:
env_features: Information about the environment that can be collected as
data
reward_fn: Reward as function of time spend pointing nadir
"""
super().__init__(env_features)
if reward_fn is None:

def reward_fn(p):
# Reward as a function of time send pointing nadir (p) and value
# per second
return p * self.env_features.value_per_second

self.reward_fn = reward_fn

def _calc_reward(self, new_data_dict: ["NadirScanningTimeData"]) -> float:
"""Calculate step reward based on all satellite data from a step

Args:
new_data_dict (dict): Satellite-DataType of new data from a step

Returns:
Step reward
"""
reward = 0.0
for scanning_time in new_data_dict.values():
reward += self.reward_fn(scanning_time.scanning_time)

return reward
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,17 @@ def regenerate_targets(self) -> None:
priority=self.priority_distribution(),
)
)


class UniformNadirFeature(EnvironmentFeatures):
"""
Defines a nadir target center at the center of the planet.
"""

def __init__(self, value_per_second: float = 1.0) -> None:
""" "
Args:
value_per_second: Amount of reward per second imaging nadir.
"""
self.name = "NadirFeature"
self.value_per_second = value_per_second
3 changes: 3 additions & 0 deletions bsk_rl/envs/general_satellite_tasking/scenario/sat_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,6 @@ def set_action(self, action: Union[int, Target, str]):
self.prev_action_key = self.image(action, self.prev_action_key)
else:
super().set_action(action)


NadirImagingAction = fsw_action_gen("action_nadir_scan")
88 changes: 88 additions & 0 deletions bsk_rl/envs/general_satellite_tasking/simulation/dynamics.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
simpleNav,
simplePowerSink,
simpleSolarPanel,
simpleStorageUnit,
spacecraft,
spacecraftLocation,
spaceToGroundTransmitter,
Expand Down Expand Up @@ -786,6 +787,93 @@ def reset_for_action(self) -> None:
self.instrumentPowerSink.powerStatus = 0


class ContinuousImagingDynModel(ImagingDynModel):
"""Equips the satellite with an instrument, storage unit, and transmitter
for continuous nadir imaging."""

@default_args(instrumentBaudRate=8e6)
def _set_instrument(
self, instrumentBaudRate: float, priority: int = 895, **kwargs
) -> None:
"""Create the continuous instrument model.

Args:
instrumentBaudRate: Data generated in step by continuous imaging [bits]
priority: Model priority.
"""
self.instrument = simpleInstrument.SimpleInstrument()
self.instrument.ModelTag = "instrument" + self.satellite.id
self.instrument.nodeBaudRate = instrumentBaudRate # make imaging instantaneous
self.instrument.nodeDataName = "Instrument" + self.satellite.id
self.simulator.AddModelToTask(
self.task_name, self.instrument, ModelPriority=priority
)

@default_args(dataStorageCapacity=20 * 8e6)
def _set_storage_unit(
self,
dataStorageCapacity: int,
priority: int = 699,
**kwargs,
) -> None:
"""Configure the storage unit and its buffers.

Args:
dataStorageCapacity: Maximum data to be stored [bits]
priority: Model priority.
"""
self.storageUnit = simpleStorageUnit.SimpleStorageUnit()
self.storageUnit.ModelTag = "storageUnit" + self.satellite.id
self.storageUnit.storageCapacity = dataStorageCapacity # bits
self.storageUnit.addDataNodeToModel(self.instrument.nodeDataOutMsg)
self.storageUnit.addDataNodeToModel(self.transmitter.nodeDataOutMsg)

# Add the storage unit to the transmitter
self.transmitter.addStorageUnitToTransmitter(
self.storageUnit.storageUnitDataOutMsg
)

self.simulator.AddModelToTask(
self.task_name, self.storageUnit, ModelPriority=priority
)

@default_args(
imageTargetMaximumRange=-1,
)
def _set_imaging_target(
self,
imageTargetMaximumRange: float = -1,
priority: int = 2000,
**kwargs,
) -> None:
"""Add a generic imaging target to dynamics. The target must be updated with a
particular location when used.

Args:
imageTargetMaximumRange: Maximum range from target to satellite when
imaging. -1 to disable. [m]
priority: Model priority.
"""
self.imagingTarget = groundLocation.GroundLocation()
self.imagingTarget.ModelTag = "scanningTarget"
self.imagingTarget.planetRadius = 1e-6
self.imagingTarget.specifyLocation(0, 0, 0)
self.imagingTarget.planetInMsg.subscribeTo(
self.environment.gravFactory.spiceObject.planetStateOutMsgs[
self.environment.body_index
]
)
self.imagingTarget.minimumElevation = np.radians(-90)
self.imagingTarget.maximumRange = imageTargetMaximumRange

self.simulator.AddModelToTask(
self.environment.env_task_name,
self.imagingTarget,
ModelPriority=priority,
)
self.imagingTarget.addSpacecraftToModel(self.scObject.scStateOutMsg)


class GroundStationDynModel(ImagingDynModel):
"""Model that connects satellite to environment ground stations"""

Expand Down
112 changes: 96 additions & 16 deletions bsk_rl/envs/general_satellite_tasking/simulation/fsw.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
mrpSteering,
rateServoFullNonlinear,
rwMotorTorque,
scanningInstrumentController,
simpleInstrumentController,
thrForceMapping,
thrMomentumDumping,
Expand Down Expand Up @@ -575,7 +576,7 @@ def _make_task_list(self) -> list[Task]:
def _set_gateway_msgs(self) -> None:
super()._set_gateway_msgs()
self.dynamics.instrument.nodeStatusInMsg.subscribeTo(
self.simpleInsControlConfig.deviceCmdOutMsg
self.insControlConfig.deviceCmdOutMsg
)

class LocPointTask(Task):
Expand All @@ -597,13 +598,13 @@ def create_module_data(self) -> None:
self.locPointWrap.ModelTag = "locPoint"

# SimpleInstrumentController configuration
self.simpleInsControlConfig = (
self.fsw.simpleInsControlConfig
self.insControlConfig = (
self.fsw.insControlConfig
) = simpleInstrumentController.simpleInstrumentControllerConfig()
self.simpleInsControlWrap = (
self.fsw.simpleInsControlWrap
) = self.fsw.simulator.setModelDataWrap(self.simpleInsControlConfig)
self.simpleInsControlWrap.ModelTag = "instrumentController"
self.insControlWrap = (
self.fsw.insControlWrap
) = self.fsw.simulator.setModelDataWrap(self.insControlConfig)
self.insControlWrap.ModelTag = "instrumentController"

def init_objects(self, **kwargs) -> None:
self._set_location_pointing(**kwargs)
Expand Down Expand Up @@ -652,23 +653,23 @@ def _set_instrument_controller(
imageRateErrorRequirement: Rate tolerance for imaging. Disable with
None. [rad/s]
"""
self.simpleInsControlConfig.attErrTolerance = imageAttErrorRequirement
self.insControlConfig.attErrTolerance = imageAttErrorRequirement
if imageRateErrorRequirement is not None:
self.simpleInsControlConfig.useRateTolerance = 1
self.simpleInsControlConfig.rateErrTolerance = imageRateErrorRequirement
self.simpleInsControlConfig.attGuidInMsg.subscribeTo(self.fsw.attGuidMsg)
self.simpleInsControlConfig.locationAccessInMsg.subscribeTo(
self.insControlConfig.useRateTolerance = 1
self.insControlConfig.rateErrTolerance = imageRateErrorRequirement
self.insControlConfig.attGuidInMsg.subscribeTo(self.fsw.attGuidMsg)
self.insControlConfig.locationAccessInMsg.subscribeTo(
self.fsw.dynamics.imagingTarget.accessOutMsgs[-1]
)

self._add_model_to_task(
self.simpleInsControlWrap, self.simpleInsControlConfig, priority=987
self.insControlWrap, self.insControlConfig, priority=987
)

def reset_for_action(self) -> None:
self.fsw.dynamics.imagingTarget.Reset(self.fsw.simulator.sim_time_ns)
self.locPointWrap.Reset(self.fsw.simulator.sim_time_ns)
self.simpleInsControlConfig.controllerStatus = 0
self.insControlConfig.controllerStatus = 0
return super().reset_for_action()

@action
Expand All @@ -679,11 +680,11 @@ def action_image(self, location: Iterable[float], data_name: str) -> None:
location: PCPF target location [m]
data_name: Data buffer to store image data to
"""
self.simpleInsControlConfig.controllerStatus = 1
self.insControlConfig.controllerStatus = 1
self.dynamics.instrumentPowerSink.powerStatus = 1
self.dynamics.imagingTarget.r_LP_P_Init = location
self.dynamics.instrument.nodeDataName = data_name
self.simpleInsControlConfig.imaged = 0
self.insControlConfig.imaged = 0
self.simulator.enableTask(self.LocPointTask.name + self.satellite.id)

@action
Expand All @@ -699,6 +700,85 @@ def action_downlink(self) -> None:
)


class ContinuousImagingFSWModel(ImagingFSWModel):
class LocPointTask(ImagingFSWModel.LocPointTask):
"""Task to point at targets and trigger the instrument"""

def create_module_data(self) -> None:
# Location pointing configuration
self.locPointConfig = (
self.fsw.locPointConfig
) = locationPointing.locationPointingConfig()
self.locPointWrap = (
self.fsw.locPointWrap
) = self.fsw.simulator.setModelDataWrap(self.locPointConfig)
self.locPointWrap.ModelTag = "locPoint"

# scanningInstrumentController configuration
self.insControlConfig = (
self.fsw.insControlConfig
) = scanningInstrumentController.scanningInstrumentControllerConfig()
self.insControlWrap = (
self.fsw.simpleInsControlWrap
) = self.fsw.simulator.setModelDataWrap(self.insControlConfig)
self.insControlWrap.ModelTag = "instrumentController"

@default_args(imageAttErrorRequirement=0.01, imageRateErrorRequirement=None)
def _set_instrument_controller(
self,
imageAttErrorRequirement: float,
imageRateErrorRequirement: float,
**kwargs,
) -> None:
"""Defines the instrument controller parameters.

Args:
imageAttErrorRequirement: Pointing attitude error tolerance for imaging
[MRP norm]
imageRateErrorRequirement: Rate tolerance for imaging. Disable with
None. [rad/s]
"""
self.insControlConfig.attErrTolerance = imageAttErrorRequirement
if imageRateErrorRequirement is not None:
self.insControlConfig.useRateTolerance = 1
self.insControlConfig.rateErrTolerance = imageRateErrorRequirement
self.insControlConfig.attGuidInMsg.subscribeTo(self.fsw.attGuidMsg)
self.insControlConfig.accessInMsg.subscribeTo(
self.fsw.dynamics.imagingTarget.accessOutMsgs[-1]
)

self._add_model_to_task(
self.insControlWrap, self.insControlConfig, priority=987
)

def reset_for_action(self) -> None:
self.instMsg = cMsgPy.DeviceCmdMsg_C()
self.instMsg.write(messaging.DeviceCmdMsgPayload())
self.fsw.dynamics.instrument.nodeStatusInMsg.subscribeTo(self.instMsg)
return super().reset_for_action()

@action
def action_nadir_scan(self) -> None:
"""Action scan nadir.

Args:
location: PCPF target location [m]
data_name: Data buffer to store image data to
"""
self.dynamics.instrument.nodeStatusInMsg.subscribeTo(
self.insControlConfig.deviceCmdOutMsg
)
self.insControlConfig.controllerStatus = 1
self.dynamics.instrumentPowerSink.powerStatus = 1
self.dynamics.imagingTarget.r_LP_P_Init = np.array([0, 0, 0.1])
self.dynamics.instrument.nodeDataName = "nadir"
self.simulator.enableTask(self.LocPointTask.name + self.satellite.id)

@action
def action_image(self, *args, **kwargs) -> None:
raise NotImplementedError("Use action_nadir_scan instead")


class SteeringFSWModel(BasicFSWModel):
"""FSW extending MRP control to use MRP steering instesd of MRP feedback."""

Expand Down
Loading