Skip to content

Commit

Permalink
HITL - Add selection panel and object state manipulation UI. (#2058)
Browse files Browse the repository at this point in the history
* Add selection panel and object state manipulation UI.

* Code clean-up.
  • Loading branch information
0mdc authored Sep 4, 2024
1 parent 7ef3d83 commit 240adce
Show file tree
Hide file tree
Showing 3 changed files with 347 additions and 3 deletions.
18 changes: 18 additions & 0 deletions examples/hitl/rearrange_v2/rearrange_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ def __init__(
self.gui_input,
)

# HACK: Available actions are hardcoded by agent type.
# Only humans can change object states.
can_change_object_states = isinstance(
self.gui_agent_controller, GuiHumanoidController
)

self.ui = UI(
hitl_config=app_service.hitl_config,
user_index=user_index,
Expand All @@ -223,6 +229,7 @@ def __init__(
gui_input=self.gui_input,
gui_drawer=app_service.gui_drawer,
camera_helper=self.camera_helper,
can_change_object_states=can_change_object_states,
)

self.end_episode_form = EndEpisodeForm(user_index, app_service)
Expand All @@ -232,6 +239,7 @@ def __init__(
self.ui.on_place.registerCallback(self._on_place)
self.ui.on_open.registerCallback(self._on_open)
self.ui.on_close.registerCallback(self._on_close)
self.ui.on_state_change.registerCallback(self._on_state_change)

self.end_episode_form.on_cancel.registerCallback(
self._on_episode_form_cancelled
Expand Down Expand Up @@ -391,6 +399,16 @@ def _on_close(self, e: UI.CloseEventData):
}
)

def _on_state_change(self, e: UI.StateChangeEventData):
self.ui_events.append(
{
"type": "state_change",
"obj_handle": e.object_handle,
"state_name": e.state_name,
"new_value": e.new_value,
}
)

def _on_episode_form_cancelled(self, _e: Any = None):
self.ui_events.append(
{
Expand Down
230 changes: 228 additions & 2 deletions examples/hitl/rearrange_v2/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,20 @@

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Callable, List, Optional, Set, Tuple, cast
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Callable,
List,
Optional,
Set,
Tuple,
cast,
)

import magnum as mn
from ui_overlay import UIOverlay
from ui_overlay import ObjectStateControl, UIOverlay
from world import World

import habitat.sims.habitat_simulator.sim_utilities as sutils
Expand Down Expand Up @@ -73,6 +83,7 @@ def __init__(
gui_input: GuiInput,
gui_drawer: GuiDrawer,
camera_helper: CameraHelper,
can_change_object_states: bool,
):
self._user_index = user_index
self._dest_mask = Mask.from_index(self._user_index)
Expand All @@ -82,6 +93,7 @@ def __init__(
self._gui_input = gui_input
self._gui_drawer = gui_drawer
self._camera_helper = camera_helper
self._can_change_object_states = can_change_object_states

self._can_grasp_place_threshold = hitl_config.can_grasp_place_threshold

Expand Down Expand Up @@ -132,12 +144,16 @@ def place_selection_fn(gui_input: GuiInput) -> bool:
# Set up UI overlay
self._ui_overlay = UIOverlay(app_service, user_index)
self._is_help_shown = True
self._last_changed_state_timestamp: Optional[
Tuple[str, datetime]
] = None

# Set up user events
self._on_pick = Event()
self._on_place = Event()
self._on_open = Event()
self._on_close = Event()
self._on_object_state_change = Event()

# Disable the snap manager automatic object positioning so that object placement is controlled here.
self._get_grasp_manager()._automatically_update_snapped_object = False
Expand Down Expand Up @@ -195,6 +211,16 @@ class CloseEventData:
def on_close(self) -> Event:
return self._on_close

@dataclass
class StateChangeEventData:
object_handle: str
state_name: str
new_value: Any

@property
def on_state_change(self) -> Event:
return self._on_object_state_change

def selection_discriminator_ignore_agents(self, object_id: int) -> bool:
"""Allow selection through agents."""
return object_id not in self._world._agent_object_ids
Expand Down Expand Up @@ -262,6 +288,8 @@ def _handle_double_click() -> bool:
if not self._gui_input.get_mouse_button(MouseButton.RIGHT):
self._place_selection.deselect()

# Update the UI overlay.
# This activates callbacks on UI events.
self._ui_overlay.update()

def draw_ui(self) -> None:
Expand Down Expand Up @@ -630,6 +658,204 @@ def _update_hovered_object_ui(self):

def _update_selected_object_ui(self):
"""Draw a UI for the currently selected object."""
object_id = self._click_selection.object_id

object_category_name: Optional[str] = None
object_state_controls: List[ObjectStateControl] = []
primary_region_name: Optional[str] = None
contextual_info: Optional[str] = None
contextual_color: Optional[List[float]] = None

if object_id is None:
return

if object_id is not None:
world = self._world
sim = self._sim
obj = sutils.get_obj_from_id(
sim, object_id, world._link_id_to_ao_map
)
if obj is not None:
# Get object category name.
object_category_name = world.get_category_from_handle(
obj.handle
)
if object_category_name is None:
object_category_name = "Object"

# Get object state controls.
object_state_controls = self._get_object_state_controls(
obj.handle
)

# Get the primary region name.
primary_region = world.get_primary_object_region(obj)
if primary_region is not None:
primary_region_name = primary_region.category.name()

# Get the contextual information.
color_ui_valid = [0.2, 1.0, 0.2, 1.0]
color_ui_invalid = [1.0, 0.2, 0.2, 1.0]
if self._is_object_pickable(object_id):
if self._held_object_id == None:
if self._world.is_any_agent_holding_object(object_id):
contextual_info = (
"The object is held by another agent."
)
contextual_color = color_ui_invalid
else:
if self._is_within_reach(obj.translation):
contextual_info = "Double-click to pick up."
contextual_color = color_ui_valid
else:
contextual_info = "Too far to pick up."
contextual_color = color_ui_invalid
elif self._held_object_id == object_id:
if self._place_selection.point is not None:
point = self._place_selection.point
normal = self._place_selection.normal
receptacle_object_id = (
self._place_selection.object_id
)
placement_valid = (
self._is_location_suitable_for_placement(
point, normal, receptacle_object_id
)
)
if placement_valid:
contextual_info = "Release to place."
contextual_color = color_ui_valid
else:
contextual_info = "Cannot place object here."
contextual_color = color_ui_invalid
else:
contextual_info = "Hold right-click to place."
elif self._is_object_interactable(object_id):
link_id = object_id
link_index = self._world.get_link_index(link_id)
if link_index:
action_name = (
"close"
if object_id in world._opened_link_set
else "open"
)

if self._held_object_id is not None:
contextual_info = f"Cannot {action_name} while holding an object."
contextual_color = color_ui_invalid
else:
ao_id = self._world._link_id_to_ao_map[link_id]
ao = self._world.get_articulated_object(ao_id)
link_node = ao.get_link_scene_node(link_index)
link_pos = link_node.translation

if self._is_within_reach(link_pos):
contextual_info = (
f"Double-click to {action_name}."
)
contextual_color = color_ui_valid
else:
contextual_info = f"Too far to {action_name}."
contextual_color = color_ui_invalid

# Update the UI.
self._ui_overlay.update_selected_object_panel(
object_category_name=object_category_name,
object_state_controls=object_state_controls,
primary_region_name=primary_region_name,
contextual_info=contextual_info,
contextual_color=contextual_color,
)

def _get_object_state_controls(
self, object_handle: str
) -> List[ObjectStateControl]:
"""
Create a list of object state manipulation controls for the specified object.
"""
# Requires the object state
osm = self._object_state_manipulator
if osm is None:
return []

object_state_controls: List[ObjectStateControl] = []

# Get all possible actions for this object.
all_possible_actions = osm.get_all_available_boolean_actions(
object_handle
)
for action in all_possible_actions:
spec = action.state_spec
recently_changed = False

# If this user can manipulate object states...
if self._can_change_object_states:
enabled = action.enabled
available = action.available
tooltip = (
action.error if action.available else "Action unavailable."
)
callback = partial(
self._state_change_callback,
spec.name,
action.target_value,
object_handle,
)

if (
self._last_changed_state_timestamp is not None
and self._last_changed_state_timestamp[0] == spec.name
):
time_since_last_state_change = (
datetime.now() - self._last_changed_state_timestamp[1]
)
recently_changed = (
time_since_last_state_change < timedelta(seconds=2.0)
)
if recently_changed:
tooltip = "Action executed."

# If this user cannot manipulate object states...
else:
enabled = False
available = False
tooltip = "The robot cannot do this action."
callback = None

object_state_controls.append(
ObjectStateControl(
spec=spec,
value=action.current_value,
enabled=enabled,
available=available,
callback=callback,
tooltip=tooltip,
recently_changed=recently_changed,
)
)
return object_state_controls

def _state_change_callback(
self, state_name: str, target_value: Any, object_handle: str
):
# Requires object state manipulator.
osm = self._object_state_manipulator
if osm is None or not self._can_change_object_states:
return

success, _ = osm.try_execute_action(
state_name, target_value, object_handle
)
if success:
self._on_object_state_change.invoke(
UI.StateChangeEventData(
object_handle=object_handle,
state_name=state_name,
new_value=target_value,
)
)

self._last_changed_state_timestamp = (state_name, datetime.now())

def _draw_aabb(
self, aabb: mn.Range3D, transform: mn.Matrix4, color: mn.Color3
Expand Down
Loading

0 comments on commit 240adce

Please sign in to comment.