Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gym_jiminy/common] Add 'BodyObserver' to get orientation and angular velocity of IMUs' parent bodies. #844

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Body Orientation State Observation
==================================

.. automodule:: gym_jiminy.common.blocks.body_orientation_observer
:members:
:undoc-members:
:show-inheritance:
2 changes: 2 additions & 0 deletions docs/api/gym_jiminy/common/blocks/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ Blocks
proportional_derivative_controller
motor_safety_limit
mahony_filter
body_orientation_observer
deformation_estimator
quantity_observer
7 changes: 7 additions & 0 deletions docs/api/gym_jiminy/common/blocks/quantity_observer.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Generic Quantity Observer
=========================

.. automodule:: gym_jiminy.common.blocks.quantity_observer
:members:
:undoc-members:
:show-inheritance:
10 changes: 6 additions & 4 deletions python/gym_jiminy/common/gym_jiminy/common/blocks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# pylint: disable=missing-module-docstring

from .quantity_observer import QuantityObserver
from .mahony_filter import MahonyFilter
from .motor_safety_limit import MotorSafetyLimit
from .proportional_derivative_controller import PDController, PDAdapter
from .quantity_observer import QuantityObserver
from .mahony_filter import MahonyFilter
from .body_orientation_observer import BodyObserver
from .deformation_estimator import DeformationEstimator


__all__ = [
'QuantityObserver',
'MahonyFilter',
'MotorSafetyLimit',
'PDController',
'PDAdapter',
'QuantityObserver',
'MahonyFilter',
'BodyObserver',
'DeformationEstimator'
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""Implementation of a stateless body orientation state estimator block
compatible with gym_jiminy reinforcement learning pipeline environment design.
"""
from collections import OrderedDict
from collections.abc import Mapping
from typing import List, Dict, Sequence, Union

import numpy as np
import gymnasium as gym

from jiminy_py.core import ImuSensor # pylint: disable=no-name-in-module

from ..bases import BaseAct, BaseObs, BaseObserverBlock, InterfaceJiminyEnv
from ..wrappers.observation_layout import NestedData
from ..utils import (DataNested,
quat_to_rpy,
matrices_to_quat,
quat_multiply,
quat_apply,
remove_twist_from_quat)


class BodyObserver(BaseObserverBlock[
Dict[str, np.ndarray], np.ndarray, BaseObs, BaseAct]):
"""Compute the orientation and angular velocity in local frame of the
parent body associated with all the IMU sensors of the robot.
"""
def __init__(self,
name: str,
env: InterfaceJiminyEnv[BaseObs, BaseAct],
*,
nested_imu_quat_key: NestedData = (
"features", "mahony_filter", "quat"),
nested_imu_omega_key: NestedData = (
"features", "mahony_filter", "omega",),
ignore_twist: bool = False,
compute_rpy: bool = True,
update_ratio: int = 1) -> None:
"""
:param name: Name of the block.
:param env: Environment to connect with.
:param nested_imu_quat_key: Nested key from environment observation
mapping to the IMU quaternion estimates.
Their ordering must be consistent with the
true IMU sensors of the robot.
:param nested_imu_omega_key: Nested key from environment observation
mapping to the IMU angular velocity in
local frame estimates. Their ordering must
be consistent with the true IMU sensors of
the robot.
:param ignore_twist: Whether to ignore the twist of the IMU quaternion
estimate.
:param compute_rpy: Whether to compute the Yaw-Pitch-Roll Euler angles
representation for the 3D orientation of the IMU,
in addition to the quaternion representation.
Optional: True by default.
:param update_ratio: Ratio between the update period of the observer
and the one of the subsequent observer. -1 to
match the simulation timestep of the environment.
Optional: `1` by default.
"""
# Backup some of the user-argument(s)
self.ignore_twist = ignore_twist
self.compute_rpy = compute_rpy

# Define observed / estimated IMU data proxies for fast access
obs_imu_data_list: List[np.ndarray] = []
for nested_imu_key in (nested_imu_quat_key, nested_imu_omega_key):
obs_imu_data: DataNested = env.observation
for key in nested_imu_key:
if isinstance(key, str):
assert isinstance(obs_imu_data, Mapping)
obs_imu_data = obs_imu_data[key]
elif isinstance(key, int):
assert isinstance(obs_imu_data, Sequence)
obs_imu_data = obs_imu_data[key]
else:
assert isinstance(obs_imu_data, np.ndarray)
slices: List[Union[int, slice]] = []
for start_end in key:
if isinstance(start_end, int):
slices.append(start_end)
elif not start_end:
slices.append(slice(None,))
else:
slices.append(slice(*start_end))
obs_imu_data = obs_imu_data[tuple(slices)]
assert isinstance(obs_imu_data, np.ndarray)
obs_imu_data_list.append(obs_imu_data)
self._obs_imu_quats, self._obs_imu_omegas = obs_imu_data_list

# Extract the relative IMU frame placement wrt its parent body
imu_rel_rot_mats: List[np.ndarray] = []
for sensor in env.robot.sensors[ImuSensor.type]:
frame = env.robot.pinocchio_model.frames[sensor.frame_index]
imu_rel_rot_mats.append(frame.placement.rotation)
self._imu_rel_quats = matrices_to_quat(tuple(imu_rel_rot_mats))

# Initialize the observer
super().__init__(name, env, update_ratio)

# Define proxies for the body orientations and angular velocities
self._quat = self.observation["quat"]
if self.compute_rpy:
self._rpy = self.observation["rpy"]
else:
self._rpy = np.array([])
self._omega = self.observation["omega"]

def _initialize_observation_space(self) -> None:
num_imu_sensors = len(self.env.robot.sensors[ImuSensor.type])
observation_space: Dict[str, gym.Space] = OrderedDict()
observation_space["quat"] = gym.spaces.Box(
low=np.full((4, num_imu_sensors), -1.0 - 1e-9),
high=np.full((4, num_imu_sensors), 1.0 + 1e-9),
dtype=np.float64)
if self.compute_rpy:
high = np.array([np.pi, np.pi/2, np.pi]) + 1e-9
observation_space["rpy"] = gym.spaces.Box(
low=-high[:, np.newaxis].repeat(num_imu_sensors, axis=1),
high=high[:, np.newaxis].repeat(num_imu_sensors, axis=1),
dtype=np.float64)
observation_space["omega"] = gym.spaces.Box(
low=float('-inf'),
high=float('inf'),
shape=(3, num_imu_sensors),
dtype=np.float64)
self.observation_space = gym.spaces.Dict(observation_space)

def _setup(self) -> None:
# Call base implementation
super()._setup()

# Fix initialization of the observation to be valid quaternions
self._quat[-1] = 1.0

@property
def fieldnames(self) -> Dict[str, List[List[str]]]:
imu_sensors = self.env.robot.sensors[ImuSensor.type]
fieldnames: Dict[str, List[List[str]]] = {}
fieldnames["quat"] = [
[".".join((sensor.name, f"Quat{e}")) for sensor in imu_sensors]
for e in ("X", "Y", "Z", "W")]
if self.compute_rpy:
fieldnames["rpy"] = [
[".".join((sensor.name, e)) for sensor in imu_sensors]
for e in ("Roll", "Pitch", "Yaw")]
fieldnames["omega"] = [
[".".join((sensor.name, e)) for sensor in imu_sensors]
for e in ("X", "Y", "Z")]
return fieldnames

def refresh_observation(self, measurement: BaseObs) -> None:
# Compute the parent body orientations
quat_multiply(self._obs_imu_quats,
self._imu_rel_quats,
out=self._quat,
is_right_conjugate=True)

# Remove twist if requested
if self.ignore_twist:
remove_twist_from_quat(self._quat)

# Compute the parent body angular velocities in local frame.
# Note that batched "quaternion apply" is faster than sequential
# "rotation matrix apply" in practice, both when using `np.einsum` or
# single-threaded numba jitted implementation.
quat_apply(self._imu_rel_quats,
self._obs_imu_omegas,
out=self._omega)

# Compute the RPY representation if requested
if self.compute_rpy:
quat_to_rpy(self._quat, self._rpy)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from collections import OrderedDict
from collections.abc import Mapping
from typing import List, Dict, Sequence, Tuple, Optional
from typing import List, Dict, Sequence, Tuple, Union, Optional

import numpy as np
import numba as nb
Expand All @@ -16,6 +16,7 @@
import pinocchio as pin

from ..bases import BaseAct, BaseObs, BaseObserverBlock, InterfaceJiminyEnv
from ..wrappers.observation_layout import NestedData
from ..utils import (DataNested,
quat_to_rpy,
matrices_to_quat,
Expand Down Expand Up @@ -508,7 +509,7 @@ def __init__(self,
imu_frame_names: Sequence[str],
flex_frame_names: Sequence[str],
ignore_twist: bool = True,
nested_imu_key: Sequence[str] = (
nested_imu_key: NestedData = (
"features", "mahony_filter", "quat"),
compute_rpy: bool = True,
update_ratio: int = 1) -> None:
Expand Down Expand Up @@ -649,11 +650,26 @@ def __init__(self,
self._is_flex_flipped.append(np.array(is_flipped))
self._is_chain_orphan.append((is_parent_orphan, is_child_orphan))

# Define IMU and encoder measurement proxy for fast access
# Define IMU observation proxy for fast access
obs_imu_quats: DataNested = env.observation
for key in nested_imu_key:
assert isinstance(obs_imu_quats, Mapping)
obs_imu_quats = obs_imu_quats[key]
if isinstance(key, str):
assert isinstance(obs_imu_quats, Mapping)
obs_imu_quats = obs_imu_quats[key]
elif isinstance(key, int):
assert isinstance(obs_imu_quats, Sequence)
obs_imu_quats = obs_imu_quats[key]
else:
assert isinstance(obs_imu_quats, np.ndarray)
slices: List[Union[int, slice]] = []
for start_end in key:
if isinstance(start_end, int):
slices.append(start_end)
elif not start_end:
slices.append(slice(None,))
else:
slices.append(slice(*start_end))
obs_imu_quats = obs_imu_quats[tuple(slices)]
assert isinstance(obs_imu_quats, np.ndarray)
self._obs_imu_quats = obs_imu_quats

Expand Down
Loading