Skip to content

Commit

Permalink
Squashed rclpy_wait_for_service for rebasing ease
Browse files Browse the repository at this point in the history
Dirty: First proof of concept for wait_for_service implementation

Moved wait set code to its own class for code reuse

Added timeout_sec_to_nsec()

wait_for_service() implemented with timers

Added unit tests for timeout_sec_to_nsec()

Added test for WaitSet class

Use negative timeouts to mean block forever

Double quotes to single quotes

Added wait_for_service() tests and fixed bugs it caught

Eliminate blind exception warning

Reduce flakiness of test by increasing time to 0.1s

Comment says negative timeouts block forever

Use :returns:

Move add_subscriptions()

arugments -> arguments

Daemon as keyword arg

Remove unnecessary namespace argument

Use S_TO_NS in test

More tests using S_TO_NS

Use monotonic clock for testing timer time

Increased test timeout by 30 seconds

CheckExact -> IsValid

Fixed wait_set not clearing ready_pointers

Remove unnecessary namespace keyword arg

Non-blocking wait

remove expression that always evaluates to True

Raise ValueError on invalid capsule

Simplified timeout_sec_to_nsec

Added 'WaitSet.destroy()' and made executor use it

GraphListener periodically checks if rclpy is shutdown

Rebase artifacts
  • Loading branch information
sloretz committed Oct 30, 2017
1 parent 7298020 commit d980051
Show file tree
Hide file tree
Showing 10 changed files with 813 additions and 111 deletions.
2 changes: 1 addition & 1 deletion rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ if(BUILD_TESTING)
WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}"
PYTHON_EXECUTABLE "${PYTHON_EXECUTABLE}"
APPEND_ENV AMENT_PREFIX_PATH=${ament_index_build_path}
TIMEOUT 90
TIMEOUT 120
)
endif()
endif()
Expand Down
37 changes: 37 additions & 0 deletions rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import threading

from rclpy.graph_listener import GraphEventSubscription as _GraphEventSubscription
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
import rclpy.utilities

Expand Down Expand Up @@ -77,3 +78,39 @@ def wait_for_future(self):
thread1 = ResponseThread(self)
thread1.start()
thread1.join()

def service_is_ready(self):
return _rclpy.rclpy_service_server_is_available(self.node_handle, self.client_handle)

def wait_for_service(self, timeout_sec=None):
"""
Block until the service is available.
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:rtype: bool
:returns: true if the service is available
"""
timeout_nsec = rclpy.utilities.timeout_sec_to_nsec(timeout_sec)
result = self.service_is_ready()
if result or timeout_sec == 0:
return result

event = threading.Event()

def on_graph_event():
nonlocal self
nonlocal event
nonlocal result
result = self.service_is_ready()
if result:
event.set()

def on_timeout():
nonlocal event
event.set()

with _GraphEventSubscription(self.node_handle, on_graph_event, timeout_nsec, on_timeout):
event.wait()

return result
186 changes: 76 additions & 110 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,11 @@
from threading import Condition as _Condition
from threading import Lock as _Lock

from rclpy.constants import S_TO_NS
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.timer import WallTimer as _WallTimer
from rclpy.utilities import ok


class _WaitSet:
"""Make sure the wait set gets destroyed when a generator exits."""

def __enter__(self):
self.wait_set = _rclpy.rclpy_get_zero_initialized_wait_set()
return self.wait_set

def __exit__(self, t, v, tb):
_rclpy.rclpy_destroy_wait_set(self.wait_set)
from rclpy.utilities import timeout_sec_to_nsec
from rclpy.wait_set import WaitSet as _WaitSet


class _WorkTracker:
Expand All @@ -57,16 +47,18 @@ def wait(self, timeout_sec=None):
"""
Wait until all work completes.
:param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:rtype: bool True if all work completed
:rtype: bool
:returns: True if all work completed
"""
if timeout_sec is not None and timeout_sec < 0:
timeout_sec = None
# Wait for all work to complete
if timeout_sec is None or timeout_sec >= 0:
with self._work_condition:
if not self._work_condition.wait_for(
lambda: self._num_work_executing == 0, timeout_sec):
return False
with self._work_condition:
if not self._work_condition.wait_for(
lambda: self._num_work_executing == 0, timeout_sec):
return False
return True


Expand Down Expand Up @@ -100,7 +92,7 @@ def shutdown(self, timeout_sec=None):
Return true if all outstanding callbacks finished executing.
:param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:rtype: bool
"""
Expand Down Expand Up @@ -152,7 +144,7 @@ def spin_once(self, timeout_sec=None):
A custom executor should use :func:`Executor.wait_for_ready_callbacks` to get work.
:param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:rtype: None
"""
Expand Down Expand Up @@ -260,21 +252,15 @@ def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
"""
Yield callbacks that are ready to be performed.
:param timeout_sec: Seconds to wait. Block forever if None. Don't wait if <= 0
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:param nodes: A list of nodes to wait on. Wait on all nodes if None.
:type nodes: list or None
:rtype: Generator[(callable, entity, :class:`rclpy.node.Node`)]
"""
timeout_timer = None
# Get timeout in nanoseconds. 0 = don't wait. < 0 means block forever
timeout_nsec = None
if timeout_sec is None:
timeout_nsec = -1
elif timeout_sec <= 0:
timeout_nsec = 0
else:
timeout_nsec = int(float(timeout_sec) * S_TO_NS)
timeout_nsec = timeout_sec_to_nsec(timeout_sec)
if timeout_nsec > 0:
timeout_timer = _WallTimer(None, None, timeout_nsec)

if nodes is None:
Expand Down Expand Up @@ -305,93 +291,73 @@ def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):

# Construct a wait set
with _WaitSet() as wait_set:
_rclpy.rclpy_wait_set_init(
wait_set,
len(subscriptions),
len(guards) + 2,
len(timers),
len(clients),
len(services))

entities = {
'subscription': (subscriptions, 'subscription_handle'),
'guard_condition': (guards, 'guard_handle'),
'client': (clients, 'client_handle'),
'service': (services, 'service_handle'),
'timer': (timers, 'timer_handle'),
}
for entity, (handles, handle_name) in entities.items():
_rclpy.rclpy_wait_set_clear_entities(entity, wait_set)
for h in handles:
_rclpy.rclpy_wait_set_add_entity(
entity, wait_set, h.__getattribute__(handle_name)
)
_rclpy.rclpy_wait_set_add_entity('guard_condition', wait_set, sigint_gc)
_rclpy.rclpy_wait_set_add_entity(
'guard_condition', wait_set, self._guard_condition)
wait_set.add_subscriptions(subscriptions)
wait_set.add_clients(clients)
wait_set.add_services(services)
wait_set.add_timers(timers)
wait_set.add_guard_conditions(guards)
wait_set.add_guard_condition(sigint_gc, sigint_gc_handle)
wait_set.add_guard_condition(self._guard_condition, self._guard_condition_handle)

# Wait for something to become ready
_rclpy.rclpy_wait(wait_set, timeout_nsec)

# get ready entities
subs_ready = _rclpy.rclpy_get_ready_entities('subscription', wait_set)
guards_ready = _rclpy.rclpy_get_ready_entities('guard_condition', wait_set)
timers_ready = _rclpy.rclpy_get_ready_entities('timer', wait_set)
clients_ready = _rclpy.rclpy_get_ready_entities('client', wait_set)
services_ready = _rclpy.rclpy_get_ready_entities('service', wait_set)
wait_set.wait(timeout_nsec)

# Check sigint guard condition
if wait_set.is_ready(sigint_gc_handle):
raise KeyboardInterrupt
_rclpy.rclpy_destroy_entity(sigint_gc)

# Mark all guards as triggered before yielding since they're auto-taken
for gc in [g for g in guards if wait_set.is_ready(g.guard_pointer)]:
gc._executor_triggered = True

# Process ready entities one node at a time
for node in nodes:
for tmr in [t for t in timers if wait_set.is_ready(t.timer_pointer)]:
# Check that a timer is ready to workaround rcl issue with cancelled timers
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr == timeout_timer:
continue
elif tmr.callback_group.can_execute(tmr):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
yielded_work = True
yield handler, tmr, node

for sub in [s for s in subscriptions if wait_set.is_ready(
s.subscription_pointer)]:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

# Check sigint guard condition
if sigint_gc_handle in guards_ready:
raise KeyboardInterrupt
_rclpy.rclpy_destroy_entity(sigint_gc)
for gc in [g for g in node.guards if g._executor_triggered]:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

# Mark all guards as triggered before yielding any handlers since they're auto-taken
for gc in [g for g in guards if g.guard_pointer in guards_ready]:
gc._executor_triggered = True
for client in [c for c in clients if wait_set.is_ready(c.client_pointer)]:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

# Process ready entities one node at a time
for node in nodes:
for tmr in [t for t in node.timers if t.timer_pointer in timers_ready]:
# Check that a timer is ready to workaround rcl issue with cancelled timers
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr.callback_group.can_execute(tmr):
for srv in [s for s in services if wait_set.is_ready(s.service_pointer)]:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, tmr, node

for sub in [s for s in node.subscriptions if s.subscription_pointer in subs_ready]:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in [g for g in node.guards if g._executor_triggered]:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

for client in [c for c in node.clients if c.client_pointer in clients_ready]:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

for srv in [s for s in node.services if s.service_pointer in services_ready]:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, srv, node

# Check timeout timer
if (timeout_nsec == 0 or
(timeout_timer is not None and timeout_timer.timer_pointer in timers_ready)):
break
yield handler, srv, node

# Check timeout timer
if (timeout_nsec == 0 or
(timeout_timer is not None and wait_set.is_ready(
timeout_timer.timer_pointer))):
break


class SingleThreadedExecutor(Executor):
Expand Down
Loading

0 comments on commit d980051

Please sign in to comment.