From 5c8a9d8a1956592ee754895dfb4cc3c67f156cb3 Mon Sep 17 00:00:00 2001
From: Alexis Duburcq <>
Date: Mon, 2 Dec 2024 22:52:18 +0100
Subject: [PATCH] [gym_jiminy/common] Add estimated IMU angular velocity to
 Mahony filter observation.

 .../common/blocks/    | 26 ++++++--
 .../gym_jiminy/common/blocks/ | 60 +++++++++++--------
 2 files changed, 57 insertions(+), 29 deletions(-)

diff --git a/python/gym_jiminy/common/gym_jiminy/common/blocks/ b/python/gym_jiminy/common/gym_jiminy/common/blocks/
index f82d2e7ef..abb4babf6 100644
--- a/python/gym_jiminy/common/gym_jiminy/common/blocks/
+++ b/python/gym_jiminy/common/gym_jiminy/common/blocks/
@@ -3,7 +3,7 @@
 from collections import OrderedDict
 from import Mapping
-from typing import List, Dict, Sequence, Tuple, Optional
+from typing import List, Dict, Sequence, Tuple, Union, Optional
 import numpy as np
 import numba as nb
@@ -16,6 +16,7 @@
 import pinocchio as pin
 from ..bases import BaseAct, BaseObs, BaseObserverBlock, InterfaceJiminyEnv
+from ..wrappers.observation_layout import NestedData
 from ..utils import (DataNested,
@@ -508,7 +509,7 @@ def __init__(self,
                  imu_frame_names: Sequence[str],
                  flex_frame_names: Sequence[str],
                  ignore_twist: bool = True,
-                 nested_imu_key: Sequence[str] = (
+                 nested_imu_key: NestedData = (
                     "features", "mahony_filter", "quat"),
                  compute_rpy: bool = True,
                  update_ratio: int = 1) -> None:
@@ -649,11 +650,26 @@ def __init__(self,
             self._is_chain_orphan.append((is_parent_orphan, is_child_orphan))
-        # Define IMU and encoder measurement proxy for fast access
+        # Define IMU observation proxy for fast access
         obs_imu_quats: DataNested = env.observation
         for key in nested_imu_key:
-            assert isinstance(obs_imu_quats, Mapping)
-            obs_imu_quats = obs_imu_quats[key]
+            if isinstance(key, str):
+                assert isinstance(obs_imu_quats, Mapping)
+                obs_imu_quats = obs_imu_quats[key]
+            elif isinstance(key, int):
+                assert isinstance(obs_imu_quats, Sequence)
+                obs_imu_quats = obs_imu_quats[key]
+            else:
+                assert isinstance(obs_imu_quats, np.ndarray)
+                slices: List[Union[int, slice]] = []
+                for start_end in key:
+                    if isinstance(start_end, int):
+                        slices.append(start_end)
+                    elif not start_end:
+                        slices.append(slice(None,))
+                    else:
+                        slices.append(slice(*start_end))
+                obs_imu_quats = obs_imu_quats[tuple(slices)]
         assert isinstance(obs_imu_quats, np.ndarray)
         self._obs_imu_quats = obs_imu_quats
diff --git a/python/gym_jiminy/common/gym_jiminy/common/blocks/ b/python/gym_jiminy/common/gym_jiminy/common/blocks/
index fde3a55ef..6f9f77d96 100644
--- a/python/gym_jiminy/common/gym_jiminy/common/blocks/
+++ b/python/gym_jiminy/common/gym_jiminy/common/blocks/
@@ -162,15 +162,16 @@ class MahonyFilter(BaseObserverBlock[
     .. note::
         The feature space of this observer is a dictionary storing quaternion
-        estimates under key 'quat', and optionally, their corresponding
-        Yaw-Pitch-Roll Euler angles representations under key 'key' if
-        `compute_rpy=True`. Both leaves are 2D array of shape (N, M), where
-        N is the number of components of the representation while M is the
-        number of IMUs of the robot. More specifically, `N=4` for quaternions
-        (Qx, Qy, Qz, Qw) and 'N=3' for Euler angles (Roll-Pitch-Yaw). Indeed,
-        the Yaw angle of the Yaw-Pitch-Roll Euler angles representations is
-        systematically included in the feature space, even if its value is
-        meaningless, i.e. `ignore_twist=True`.
+        estimates under key 'quat', optionally, their corresponding
+        Yaw-Pitch-Roll Euler angles representations under key 'rpy' if
+        `compute_rpy=True`, and finally, the angular velocity in local frame
+        estimates under key 'omega'. Both leaves are 2D array of shape (N, M),
+        where N is the number of components of the representation while M is
+        the number of IMUs of the robot. Specifically, `N=4` for quaternions
+        (Qx, Qy, Qz, Qw), 'N=3' for both the Euler angles (Roll-Pitch-Yaw) and
+        the angular velocity. The Yaw angle of the Yaw-Pitch-Roll Euler angles
+        representations is systematically included in the feature space, even
+        if its value is meaningless, i.e. `ignore_twist=True`.
     .. warning::
         This filter works best for 'observe_dt' smaller or equal to 5ms. Its
@@ -186,7 +187,7 @@ def __init__(self,
                  exact_init: bool = True,
                  kp: Union[np.ndarray, float] = 1.0,
                  ki: Union[np.ndarray, float] = 0.1,
-                 compute_rpy: bool = True,
+                 compute_rpy: bool = False,
                  update_ratio: int = 1) -> None:
         :param name: Name of the block.
@@ -205,14 +206,14 @@ def __init__(self,
                            is not recommended because the robot is often
                            free-falling at init, which is not realistic anyway.
                            Optional: `True` by default.
-        :param mahony_kp: Proportional gain used for gyro-accel sensor fusion.
-                          Set it to 0.0 to use only the gyroscope. In such a
-                          case, the orientation estimate would be exact if the
-                          sensor is bias- and noise-free, and the update period
-                          matches the simulation integrator timestep.
-                          Optional: `1.0` by default.
-        :param mahony_ki: Integral gain used for gyro bias estimate.
-                          Optional: `0.1` by default.
+        :param kp: Proportional gain used for gyro-accel sensor fusion. Set it
+                   to 0.0 to use only the gyroscope. In such a case, the
+                   orientation estimate would be exact if the sensor is bias-
+                   and noise-free, and the update period matches the
+                   simulation integrator timestep.
+                   Optional: `1.0` by default.
+        :param ki: Integral gain used for gyro bias estimate.
+                   Optional: `0.1` by default.
         :param compute_rpy: Whether to compute the Yaw-Pitch-Roll Euler angles
                             representation for the 3D orientation of the IMU,
                             in addition to the quaternion representation.
@@ -279,9 +280,6 @@ def __init__(self,
         if self._update_twist:
             self._state["twist"] = self._twist
-        # Store the estimate angular velocity to avoid redundant computations
-        self._omega = np.zeros((3, num_imu_sensors))
         # Initialize the observer
         super().__init__(name, env, update_ratio)
@@ -291,6 +289,7 @@ def __init__(self,
             self._rpy = self.observation["rpy"]
             self._rpy = np.array([])
+        self._omega = self.observation["omega"]
     def _initialize_state_space(self) -> None:
         """Configure the internal state space of the observer.
@@ -340,6 +339,11 @@ def _initialize_observation_space(self) -> None:
                 low=-high[:, np.newaxis].repeat(num_imu_sensors, axis=1),
                 high=high[:, np.newaxis].repeat(num_imu_sensors, axis=1),
+        observation_space["omega"] = gym.spaces.Box(
+            low=float('-inf'),
+            high=float('inf'),
+            shape=(3, num_imu_sensors),
+            dtype=np.float64)
         self.observation_space = gym.spaces.Dict(observation_space)
     def _setup(self) -> None:
@@ -390,8 +394,11 @@ def fieldnames(self) -> Dict[str, List[List[str]]]:
         imu_sensors = self.env.robot.sensors[ImuSensor.type]
         fieldnames: Dict[str, List[List[str]]] = {}
         fieldnames["quat"] = [
-            [f"{}.Quat{e}" for sensor in imu_sensors]
-            for e in ("x", "y", "z", "w")]
+            [".".join((, f"Quat{e}")) for sensor in imu_sensors]
+            for e in ("X", "Y", "Z", "W")]
+        fieldnames["omega"] = [
+            [".".join((, e)) for sensor in imu_sensors]
+            for e in ("X", "Y", "Z")]
         if self.compute_rpy:
             fieldnames["rpy"] = [
                 [".".join((, e)) for sensor in imu_sensors]
@@ -425,7 +432,7 @@ def refresh_observation(self, measurement: BaseObs) -> None:
                     rot = robot.pinocchio_data.oMf[sensor.frame_index].rotation
-                # Convert it to quaternions
+                # Convert the rotation matrices of the IMUs to quaternions
                 matrices_to_quat(tuple(imu_rots), self._quat)
                 # Keep track of tilt if necessary
@@ -433,6 +440,11 @@ def refresh_observation(self, measurement: BaseObs) -> None:
                     self._twist[:] = np.arctan2(self._quat[2], self._quat[3])
                 self._is_initialized = True
+            # Compute the RPY representation if requested
+            if self.compute_rpy:
+                quat_to_rpy(self._quat, self._rpy)
         # Run an iteration of the filter, computing the next state estimate