-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#37: Add control
mixin and Controller
interface
#158
#37: Add control
mixin and Controller
interface
#158
Conversation
@armantekinalp Here is a first prototype of the controller interface. I mainly took the external forces mixin and adjusted it to the purposes of the control task. Can you please already have a look at the interface? The next step will be to devise an illustrative example. Do you guys have any nice ideas for simple control tasks involving at least two different systems? |
Codecov Report
@@ Coverage Diff @@
## update-0.3.0 #158 +/- ##
================================================
- Coverage 87.62% 86.74% -0.89%
================================================
Files 43 45 +2
Lines 2820 2866 +46
Branches 368 374 +6
================================================
+ Hits 2471 2486 +15
- Misses 328 359 +31
Partials 21 21
Continue to review full report at Codecov.
|
control
mixin and Controller
interfacecontrol
mixin and Controller
interface
I updated the initial post of this PR with the example of a pendulum tracking a sphere. @skim0119 @armantekinalp From my side this PR is ready except for the missing tests. Can you please have a look what you guys might be missing in this PR or if you would like to have some implementation changed? |
@skim0119 @armantekinalp @bhosale2 Do you have an update on reviewing the proposed mixin and interface? |
@mstoelzle Hi -- Sorry for taking some time to answer your question. Our original idea for |
@mstoelzle Hi -- Thank you for your patience. We have some criteria we would like to include on the controller. Combined with your example script above and your question on #37, we believe the feature of
Some concerns we have addressed with your current implementation are:
We have come up with a rough concept and interface to address above, and we would like to know if this covers the features you want. Interfacecontroller = simulator.add_controller(
# Active systems we control
# The purpose is to group, not to interact rod1 and rod2
# Control rule will be applied to every system specified.
systems: Union[List[SystemType], Dict[Systemtype]] = [rod1, rod2]
).using(
CustomController,
passive_systems: Dict[str, SystemType]={"target": sphere1}, # Provide read-only views
action_shape: Optional[Tuple[int]]=(17,), # External parameters
step_skip: int=1000
)
# External access
parameters = [np.ones(17,), np.zeros(17,)]
controller.set_action(parameters) Conceptuallyfrom typing import Protocol
from elastica.typing import SystemType
class ControllerBase:
def get_state(self, tag:str, query: List[str]) -> Dict[str,np.ndarray]:
state = {}
for key in query:
state[key] = self.passive_systems[tag][key] # This will be read-only
return state
def set_action(self, ...):
...
class CustomController(ControllerBase):
def __init__(
self,
system: Union[SystemType, List[SystemType], Dict[SystemType]],
passive_systems: Dict[str, SystemType],
step_skip: int,
*args,
**kwargs
):
super().__init__(
systems=system,
step_skip=step_skip,
passive_systems=passive_systems,
*args,
**kwargs
)
def compute_force(self, my_velocity, target_velocity, action):
...
def apply_forces(self, system: SystemType, action: np.ndarray, time: float):
target_velocity = self.get_state("target", ["velocity_collection"])
system.external_forces += compute_force(
system.velocity_collection, target_velocity, action)
def apply_torques(self, system: SystemType, action: np.ndarray, time: float):
... In the end, we believe the current limitation comes from difficulties to handle multiple system instances at once, and this syntax should be able to resolve the issue. Some part must be included in the backend wrapper, such as converting |
@skim0119 Thanks a lot for this proposal. While I do think it is already an improvement over the current options in the Major commentIn many applications involving multi-body system control, we need to know the system state of multiple systems (this is possible in this proposal through the Let's take the example of serial manipulator or parallel robots: here we usually jointly optimize the control inputs for all actuators controlling all links / joints. If I want to implement this following the proposed interface, I would need to add a separate controller for each joint. But as I have to optimize the entire serial manipulator, I would need to always optimize for all systems, and then only apply the control input for one joint at a time. This has two major disadvantages:
I am not sure what is your reasoning behind adding this strict requirement of applying the same control input to all active systems this controller is assigned to. At least in my perspective, this is unnecessarily limiting and restrictive. I see some potential benefits of distinguishing between With respect to this (independent of the actions - see below), I would propose an implementation as follows: from typing import Protocol
from elastica.typing import SystemType
class CustomController(ControllerBase):
def __init__(
self,
active_system: Dict[str, SystemType],
passive_systems: Dict[str, SystemType] = {},
step_skip: int,
*args,
**kwargs
):
super().__init__(
active_systems=active_systems,
step_skip=step_skip,
passive_systems=passive_systems,
*args,
**kwargs
)
def compute_optimal_control_inputs(self, my_velocities, target_velocity, action):
pass
def apply_forces(self, active_systems: Dict[str, SystemType], action: np.ndarray, time: float):
target_velocity = self.get_state("target", ["velocity_collection"])
my_velocities = [system.velocity_collection for system in active_systems.values()]
control_inputs = compute_optimal_control_inputs(
my_velocities, target_velocity, action)
i = 0
for system_name, active_system in active_systems.items():
active_system.external_forces += control_inputs[i]
i += 1
def apply_torques(self, active_systems: Dict[str, SystemType], action: np.ndarray, time: float):
pass Minor commentI guess this would depend on the specific implementation, but would QuestionsI also have some questions concerning the proposed
I would be grateful if you could explain the concept behind these |
@mstoelzle Thank you for your response. I'll try to make some comments and answer some questions. On Major Comments,I'm not quite sure if I understood your example case correctly, but I still think what you have described can be implemented with the previous schematic, maybe with some minor additional features.
Isn't this the whole purpose of having Roughly, class ControllerBase: # ABC or Protocol
def pre_step(self, ...): # Called before apply_force/apply_torques
...
def post_step(self, ...): # Called after apply_force/apply_torques
...
def get_state(self, tag:str, query: List[str]) -> Dict[str,np.ndarray]:
state = {}
for key in query:
state[key] = self.passive_systems[tag][key] # This will be read-only
return state
def set_action(self, ...):
...
...
class CustomController(ControllerBase):
def pre_step(self):
state: Dict[str, np.ndarray] = self.get_state(...)
self.forces, self.torques = nb_control_law(state)
def apply_forces(self, system: SystemType, action: np.ndarray, time: float):
system.external_forces += self.forces[system]
I'm not sure what you mean. If you don't have conditioning, isn't it same as applying same law for each joint?
This is not a restriction, nor strict. It is up to users to write their own If user needs active system's state, they can also pass those to passive_system parameters.
What is difference between having two separate controllers in this case? If concern is some joint computation, can you maybe implement using Maybe there is something I'm missing, but I believe we are getting closer to what is needed. Could you maybe provide some example scripts that you are having trouble with? Regarding Minor Comment
Regarding ActionThis is probably our original intention of making a controller, and it is a somewhat different purpose than what you have proposed. We thought if this controller will serve as an interface for more generalized robotics/optimization problems, we need external online controller for user to intervene within the simulation. This is why the concept goes along with the The idea is that our current implementation only supports passive or prescribed force/torque input. There are ways to change activation during the simulation, but it is not so clean and clear to use. Within this context, we thought the controller should act as a variable lambdas within the simulator, exposed to user, used by forcing/environment, such that user can pass the activation. Just to answer your questions,
Some parameters are controllable by the user while the simulation is running.
(conceptually)
That way is also fine; I just thought having a setter method can include pre-check or callbacks. At this point, I don't see much difference between |
Regarding actionsOk thanks, I understand the purpose a bit better then.
I don't understand why you are passing here a list of numpy arrays, and then controller.set_action(np.ones(17,)) and then maybe at another time-step: controller.set_action(np.zeros(17,)) @skim0119 It also needs to made clear in As a side comment connected to enable the use of external online controllers: for these online controllers to work, it might be important feature to specify at which real-time rate the simulator should run. Some robotics frameworks such as ROS run at a fixed clock, and any interface between PyElastica and ROS-based nodes will be very difficult to implement without having the confidence at which real-time rate (e.g. how many simulation time-steps per second of real-time) the simulator is running. |
@mstoelzle will it be enough to pass time-step of simulation to the ControllerClass |
I passed the list of arrays, thinking first
The parameter passed to apply_forces/apply_torques will be passed from backend, and previously set action will be passed. It is optional, allowing
Yes, I agree, and it is definitely one of the reasons why this |
Here the idea is there might be more than one active system in the simulator and we pass a list containing actions for each rod. @skim0119 can confirm |
Ok, I get what you mean and I am supportive of this purpose. However, this is really inconsistent:
This kind of doesn't make sense. I would therefore suggest to remove the requirement that each active system needs to have the same |
Regarding the major comment
@skim0119 Ok let us consider a planar serial manipulator (e.g. a robotic arm) consisting of three (rigid) links and three joints. Let's also assume that we already have trained an RL controller with observation space 18 (e.g. full position and velocity of each link) and actuation space of 3 (e.g. torque to be applied to each joint). This requires knowledge of all three system (e.g. links) states (which is given by your proposal), but also the actuation of three systems, as the RL controller returns 3 torques. Now, we need to implement the following steps:
If I want to implement this according to your proposal for the controller interface, I would need to do it likes this:
In this example, we had to evaluate the RL controller 3x instead of just 1x time, because of the requirements of the controller interface. If we could define multiple
I don't think this is true. As |
@mstoelzle -- Something along this line? class ModelProtocol(Protocol):
def predict(self, X: np.ndarray) -> np.ndarray:
...
class SerialManipulatorController(ControllerBase):
def __init__(self, model: ModelProtocol, *args, **kwargs):
super().__init__(*args, **kwargs)
self.model = model
self.z = np.array([0. ,0. ,1.])
def _convert_state_array(self, state) -> np.ndarray:
...
def pre_step(self, active_system, passive_system, action, time):
state = self.get_state(0, ['position_collection', 'velocity_collection'])
state = self._convert_state_array(state)
actuations = self.model.predict(state) # Assuming in-order of links
actuations.append(0)
self.base_torques, self.end_torques = {}, {}
for idx, system in enumerate(active_system):
self.base_torques[system] = actuations[idx] * self.z
self.end_torques[system] = -actuations[idx+1] * self.z
def apply_torques(self, system: SystemType, action: np.ndarray, time: float):
system.external_torques[...,0] += self.base_torques[system]
system.external_torques[...,-1] += self.end_torques[system]
link1: SystemType = CosseratRod(...)
link2: SystemType = CosseratRod(...)
link3: SystemType = CosseratRod(...)
path:str = "<model path>"
RL_Model = tf.keras.model.load_model(path)
simulator.add_controller(
[link1, link2, link3]
).using(
SerialManipulatorController,
model: ModelProtocol=RL_Model
passive_systems=[link1, link2, link3],
step_skip=1000
) |
@skim0119 Thanks for going through the effort and coding the implementation of this example :) I think this would work and cover the technical needs of my project. From a code-style and UI perspective, I still think it is a bit overly complicated and could be implemented in a lighter way. I still don't quite understand what is your motivation for implementing it this way? What is the benefit of adding this
If the reason is that the users are likely expecting a specific kind of syntax for
def run_controller(self, active_systems: Dict[str, SystemType], actions: Optional[Dict[str, np.ndarray]], time: float):
pass |
Our focus is more geared towards delivering a "generalizable" solver, and it is prioritized over an "clean-looking" solver. We want this feature to be benefiting other collaborator's projects and any future projects as well, which means we want the design of the pipeline/functionals modularized and application-agnostic.
Your We don't want to change the structure of |
Closing it for now due to its inactive status. @armantekinalp we can re-open it in the future if work on this resumes again. |
Motivation
Many usage cases of
PyElastica
involve prototyping controllers. Often that involves knowing the state of multiple systems or even the state of the entire simulator and use this state information to apply appropriate (external) forces and torques to the various systems to achieve a certain control task.Considered alternatives
Using the external
forcing
mixins is insufficient for these use-cases, as it only allows knowledge of one system state at a time and analogue only one system can be controlled at a time. To circumvent these issues, complex and custom workarounds would need to be applied involving multiprocessing or pointers to persistent memory of system objects.Usage
Devise a custom controller:
Apply controller to simulator:
Example
I added the example of a pendulum tracking a sphere to the
ControllerCases
inpendulum_tracking_sphere.py
.Here, we initialize a (planar) pendulum as a cylinder in
PyElastica
, which is only allowed to rotate around the z-axis. The pendulum can be actuated by applying a torque around the z-axis in the inertial frame to the pendulum. We additionally simulate a sphere. This sphere moves in a circular trajectory around the z-axis at a fixed angular velocity, which is unknown to the controller. Now, we construct thePendulumTrackingController
as a PID controller tracking the movement of the sphere. We compute the error between the polar angle of the sphere with the polar angle of the pendulum to output a torque acting on the pendulum to track the sphere as closely as possible:pendulum_tracking_sphere_example_xy.mp4
pendulum_tracking_sphere_example.mp4
This example nicely illustrates the functionality of the new
control
interface, as this control problem requires access to the state of two simulate systems to compute appropriate forces / torques.TODOs
Related to:
#37