Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Goal reached collision avoidance update #132

Merged
merged 7 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 137 additions & 5 deletions src/giskardpy/goals/collision_avoidance.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections import defaultdict
from copy import deepcopy
from typing import Dict, Optional, List
import giskardpy.casadi_wrapper as cas
import giskardpy.utils.tfwrapper as tf
Expand All @@ -8,6 +10,7 @@
from giskardpy.god_map import god_map
from giskardpy.my_types import my_string
from giskardpy.symbol_manager import symbol_manager
from giskardpy.utils import logging


class ExternalCollisionAvoidance(Goal):
Expand All @@ -17,6 +20,7 @@ def __init__(self,
robot_name: str,
max_velocity: float = 0.2,
hard_threshold: float = 0.0,
name_prefix: Optional[str] = None,
soft_thresholds: Optional[Dict[my_string, float]] = None,
idx: int = 0,
num_repeller: int = 1,
Expand All @@ -33,7 +37,7 @@ def __init__(self,
self.num_repeller = num_repeller
self.link_name = link_name
self.idx = idx
name = f'{self.__class__.__name__}/{self.link_name}/{self.idx}'
name = f'{name_prefix}/{self.__class__.__name__}/{self.link_name}/{self.idx}'
super().__init__(name)
self.root = god_map.world.root_link_name
self.robot_name = robot_name
Expand Down Expand Up @@ -88,7 +92,7 @@ def __init__(self,

weight = cas.save_division(WEIGHT_COLLISION_AVOIDANCE, # divide by number of active repeller per link
cas.min(number_of_external_collisions, self.num_repeller))
distance_monitor = Monitor(f'collision distance {self.link_name}/{self.idx}', crucial=False)
distance_monitor = Monitor(f'collision distance {self.name}', crucial=False)
distance_monitor.set_expression(cas.less(actual_distance, 50))
self.add_monitor(distance_monitor)
task = Task('stay away')
Expand All @@ -101,6 +105,7 @@ def __init__(self,
lower_slack_limit=-float('inf'),
upper_slack_limit=upper_slack)
self.add_task(task)
self.connect_monitors_to_all_tasks(to_start, to_hold, to_end)

def map_V_n_symbol(self):
expr = f'god_map.closest_point.get_external_collisions(\'{self.link_name}\')[{self.idx}].map_V_n'
Expand Down Expand Up @@ -138,6 +143,7 @@ def __init__(self,
soft_threshold: float = 0.05,
idx: float = 0,
num_repeller: int = 1,
name_prefix: Optional[str] = None,
to_start: Optional[List[Monitor]] = None,
to_hold: Optional[List[Monitor]] = None,
to_end: Optional[List[Monitor]] = None):
Expand All @@ -150,7 +156,7 @@ def __init__(self,
self.idx = idx
if self.link_a.prefix != self.link_b.prefix:
raise Exception(f'Links {self.link_a} and {self.link_b} have different prefix.')
name = f'{self.__class__.__name__}/{self.link_a}/{self.link_b}/{self.idx}'
name = f'{name_prefix}/{self.__class__.__name__}/{self.link_a}/{self.link_b}/{self.idx}'
super().__init__(name)
self.root = god_map.world.root_link_name
self.robot_name = robot_name
Expand Down Expand Up @@ -197,7 +203,7 @@ def __init__(self,

weight = cas.save_division(WEIGHT_COLLISION_AVOIDANCE, # divide by number of active repeller per link
cas.min(number_of_self_collisions, self.num_repeller))
distance_monitor = Monitor(f'collision distance {self.link_a}/{self.link_b}/{self.idx}', crucial=False)
distance_monitor = Monitor(f'collision distance {self.name}', crucial=False)
distance_monitor.set_expression(cas.less(actual_distance, 50))
god_map.debug_expression_manager.add_debug_expression(f'distance {str(self)}', actual_distance)
self.add_monitor(distance_monitor)
Expand All @@ -211,6 +217,7 @@ def __init__(self,
lower_slack_limit=-float('inf'),
upper_slack_limit=upper_slack)
self.add_task(task)
self.connect_monitors_to_all_tasks(to_start, to_hold, to_end)

def get_contact_normal_in_b(self):
expr = f'god_map.closest_point.get_self_collisions(\'{self.link_a}\', \'{self.link_b}\')[{self.idx}].new_b_V_n'
Expand Down Expand Up @@ -336,9 +343,134 @@ def get_link_b(self):

class CollisionAvoidance(Goal):
def __init__(self,
name: str,
collision_entries: List[CollisionEntry],
name: Optional[str] = None,
to_start: Optional[List[Monitor]] = None,
to_hold: Optional[List[Monitor]] = None,
to_end: Optional[List[Monitor]] = None):
if name is None:
name = self.__class__.__name__
super().__init__(name)
self.to_start = to_start
self.to_hold = to_hold
self.to_end = to_end
self.collision_matrix = god_map.collision_scene.create_collision_matrix(deepcopy(collision_entries))
if not collision_entries or not god_map.collision_scene.is_allow_all_collision(collision_entries[-1]):
self.add_external_collision_avoidance_constraints(
soft_threshold_override=self.collision_matrix)
if not collision_entries or (not god_map.collision_scene.is_allow_all_collision(collision_entries[-1]) and
not god_map.collision_scene.is_allow_all_self_collision(collision_entries[-1])):
self.add_self_collision_avoidance_constraints()
if to_start:
def cb():
god_map.collision_scene.set_collision_matrix(self.collision_matrix)
god_map.collision_scene.reset_cache()
god_map.monitor_manager.register_monitor_cb(to_start, cb)
else:
god_map.collision_scene.set_collision_matrix(self.collision_matrix)

def _task_sanity_check(self):
pass

@profile
def add_external_collision_avoidance_constraints(self, soft_threshold_override=None):
configs = god_map.collision_scene.collision_avoidance_configs
fixed_joints = god_map.collision_scene.fixed_joints
joints = [j for j in god_map.world.controlled_joints if j not in fixed_joints]
num_constrains = 0
for joint_name in joints:
try:
robot_name = god_map.world.get_group_of_joint(joint_name).name
except KeyError:
child_link = god_map.world.joints[joint_name].child_link_name
robot_name = god_map.world._get_group_name_containing_link(child_link)
child_links = god_map.world.get_directly_controlled_child_links_with_collisions(joint_name, fixed_joints)
if child_links:
number_of_repeller = configs[robot_name].external_collision_avoidance[joint_name].number_of_repeller
for i in range(number_of_repeller):
child_link = god_map.world.joints[joint_name].child_link_name
hard_threshold = configs[robot_name].external_collision_avoidance[joint_name].hard_threshold
if soft_threshold_override is not None:
soft_threshold = soft_threshold_override
else:
soft_threshold = configs[robot_name].external_collision_avoidance[joint_name].soft_threshold
self.add_constraints_of_goal(ExternalCollisionAvoidance(robot_name=robot_name,
link_name=child_link,
name_prefix=self.name,
hard_threshold=hard_threshold,
soft_thresholds=soft_threshold,
idx=i,
num_repeller=number_of_repeller,
to_start=self.to_start,
to_hold=self.to_hold,
to_end=self.to_end))
num_constrains += 1
logging.loginfo(f'Adding {num_constrains} external collision avoidance constraints.')

@profile
def add_self_collision_avoidance_constraints(self):
counter = defaultdict(int)
fixed_joints = god_map.collision_scene.fixed_joints
configs = god_map.collision_scene.collision_avoidance_configs
num_constr = 0
for robot_name in god_map.collision_scene.robot_names:
for link_a_o, link_b_o in god_map.world.groups[robot_name].possible_collision_combinations():
link_a_o, link_b_o = god_map.world.sort_links(link_a_o, link_b_o)
try:
if (link_a_o, link_b_o) in god_map.collision_scene.self_collision_matrix:
continue
link_a, link_b = god_map.world.compute_chain_reduced_to_controlled_joints(link_a_o, link_b_o,
fixed_joints)
link_a, link_b = god_map.world.sort_links(link_a, link_b)
counter[link_a, link_b] += 1
except KeyError as e:
# no controlled joint between both links
pass

for link_a, link_b in counter:
group_names = god_map.world.get_group_names_containing_link(link_a)
if len(group_names) != 1:
group_name = god_map.world.get_parent_group_name(group_names.pop())
else:
group_name = group_names.pop()
num_of_constraints = min(1, counter[link_a, link_b])
for i in range(num_of_constraints):
key = f'{link_a}, {link_b}'
key_r = f'{link_b}, {link_a}'
config = configs[group_name].self_collision_avoidance
if key in config:
hard_threshold = config[key].hard_threshold
soft_threshold = config[key].soft_threshold
number_of_repeller = config[key].number_of_repeller
elif key_r in config:
hard_threshold = config[key_r].hard_threshold
soft_threshold = config[key_r].soft_threshold
number_of_repeller = config[key_r].number_of_repeller
else:
# TODO minimum is not the best if i reduce to the links next to the controlled chains
# should probably add symbols that retrieve the values for the current pair
hard_threshold = min(config[link_a].hard_threshold,
config[link_b].hard_threshold)
soft_threshold = min(config[link_a].soft_threshold,
config[link_b].soft_threshold)
number_of_repeller = min(config[link_a].number_of_repeller,
config[link_b].number_of_repeller)
groups_a = god_map.world._get_group_name_containing_link(link_a)
groups_b = god_map.world._get_group_name_containing_link(link_b)
if groups_b == groups_a:
robot_name = groups_a
else:
raise Exception(f'Could not find group containing the link {link_a} and {link_b}.')
self.add_constraints_of_goal(SelfCollisionAvoidance(link_a=link_a,
link_b=link_b,
robot_name=robot_name,
name_prefix=self.name,
hard_threshold=hard_threshold,
soft_threshold=soft_threshold,
idx=i,
num_repeller=number_of_repeller,
to_start=self.to_start,
to_hold=self.to_hold,
to_end=self.to_end))
num_constr += 1
logging.loginfo(f'Adding {num_constr} self collision avoidance constraints.')
13 changes: 11 additions & 2 deletions src/giskardpy/goals/goal.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def __str__(self) -> str:
def __repr__(self) -> str:
return self.name

def has_tasks(self) -> bool:
return len(self.tasks) > 0

def get_joint_position_symbol(self, joint_name: PrefixName) -> Union[w.Symbol, float]:
"""
returns a symbol that refers to the given joint
Expand Down Expand Up @@ -120,8 +123,7 @@ def get_constraints(self) -> Tuple[Dict[str, EqualityConstraint],
self._inequality_constraints = OrderedDict()
self._derivative_constraints = OrderedDict()
self._debug_expressions = OrderedDict()
if not isinstance(self, NonMotionGoal) and not self.tasks:
raise ConstraintInitalizationException(f'Goal {str(self)} has no tasks.')
self._task_sanity_check()
for task in self.tasks:
for constraint in task.get_eq_constraints():
name = f'{task.name}/{constraint.name}'
Expand All @@ -139,6 +141,10 @@ def get_constraints(self) -> Tuple[Dict[str, EqualityConstraint],
return self._equality_constraints, self._inequality_constraints, self._derivative_constraints, \
self._debug_expressions

def _task_sanity_check(self):
if not self.has_tasks():
raise ConstraintInitalizationException(f'Goal {str(self)} has no tasks.')

def add_constraints_of_goal(self, goal: Goal):
for task in goal.tasks:
if not [t for t in self.tasks if t.name == task.name]:
Expand Down Expand Up @@ -168,3 +174,6 @@ class NonMotionGoal(Goal):

def make_constraints(self):
pass

def _task_sanity_check(self):
pass
15 changes: 12 additions & 3 deletions src/giskardpy/goals/monitors/monitor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ def flipped_to_one(a: np.ndarray, b: np.ndarray) -> np.ndarray:
return np.logical_and(np.logical_not(a), np.logical_or(a, b))


def monitor_list_to_monitor_name_tuple(monitors: List[Union[str, Monitor]]) -> Tuple[str, ...]:
return tuple(sorted(monitor.name if isinstance(monitor, Monitor) else monitor for monitor in monitors))


class MonitorManager:
compiled_monitors: CompiledFunction
state: np.ndarray
Expand All @@ -42,6 +46,7 @@ def __init__(self):
self.robot_names = god_map.collision_scene.robot_names
self.local_minimum_monitor_id = None
self.substitution_values = {}
self.triggers = {}

def compile_monitors(self):
expressions = []
Expand Down Expand Up @@ -105,7 +110,7 @@ def register_expression_updater(self, expression: cas.PreservedCasType,
"""
Expression is updated when all monitors are 1 at the same time, but only once.
"""
monitor_names = tuple(sorted(monitor.name if isinstance(monitor, Monitor) else monitor for monitor in monitors))
monitor_names = monitor_list_to_monitor_name_tuple(monitors)
old_symbols = []
new_symbols = []
for i, symbol in enumerate(expression.free_symbols()):
Expand All @@ -116,11 +121,15 @@ def register_expression_updater(self, expression: cas.PreservedCasType,
return new_expression

def _register_expression_update_triggers(self):
self.triggers = {}
for monitor_names, values in self.substitution_values.items():
trigger_filter = tuple([i for i, m in enumerate(self.monitors) if m.name in monitor_names])
self.triggers[trigger_filter] = lambda: self.update_substitution_values(monitor_names, list(values.keys()))

def register_monitor_cb(self, monitor_names: Tuple[str, ...], cb: Callable):
monitor_names = monitor_list_to_monitor_name_tuple(monitor_names)
trigger_filter = tuple([i for i, m in enumerate(self.monitors) if m.name in monitor_names])
self.triggers[trigger_filter] = cb

@profile
def update_substitution_values(self, monitor_names: Tuple[str, ...], keys: Optional[List[str]] = None):
if keys is None:
Expand Down Expand Up @@ -160,7 +169,7 @@ def parse_monitors(self, monitor_msgs: List[giskard_msgs.Monitor]):
raise UnknownConstraintException(f'unknown monitor type: \'{monitor_msg.type}\'.')
try:
params = json_str_to_kwargs(monitor_msg.parameter_value_pair)
monitor: Monitor = C(monitor_msg.name, **params)
monitor: Monitor = C(name=monitor_msg.name, **params)
self.add_monitor(monitor)
except Exception as e:
traceback.print_exc()
Expand Down
2 changes: 1 addition & 1 deletion src/giskardpy/goals/monitors/monitors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union, List, TypeVar
from typing import Union, List, TypeVar, Optional

import numpy as np

Expand Down
Loading
Loading