Skip to content

Commit

Permalink
Added 'WaitSet.destroy()' and made executor use it
Browse files Browse the repository at this point in the history
  • Loading branch information
sloretz committed Oct 24, 2017
1 parent be25673 commit d8521cd
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 66 deletions.
131 changes: 66 additions & 65 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,73 +290,74 @@ def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
timers.append(timeout_timer)

# Construct a wait set
wait_set = _WaitSet()
wait_set.add_subscriptions(subscriptions)
wait_set.add_clients(clients)
wait_set.add_services(services)
wait_set.add_timers(timers)
wait_set.add_guard_conditions(guards)
wait_set.add_guard_condition(sigint_gc, sigint_gc_handle)
wait_set.add_guard_condition(self._guard_condition, self._guard_condition_handle)

# Wait for something to become ready
wait_set.wait(timeout_nsec)

# Check sigint guard condition
if wait_set.is_ready(sigint_gc_handle):
raise KeyboardInterrupt
_rclpy.rclpy_destroy_entity('guard_condition', sigint_gc)

# Mark all guards as triggered before yielding any handlers since they're auto-taken
for gc in [g for g in guards if wait_set.is_ready(g.guard_pointer)]:
gc._executor_triggered = True

# Process ready entities one node at a time
for node in nodes:
for tmr in [t for t in timers if wait_set.is_ready(t.timer_pointer)]:
# Check that a timer is ready to workaround rcl issue with cancelled timers
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr == timeout_timer:
continue
elif tmr.callback_group.can_execute(tmr):
with _WaitSet() as wait_set:
wait_set.add_subscriptions(subscriptions)
wait_set.add_clients(clients)
wait_set.add_services(services)
wait_set.add_timers(timers)
wait_set.add_guard_conditions(guards)
wait_set.add_guard_condition(sigint_gc, sigint_gc_handle)
wait_set.add_guard_condition(self._guard_condition, self._guard_condition_handle)

# Wait for something to become ready
wait_set.wait(timeout_nsec)

# Check sigint guard condition
if wait_set.is_ready(sigint_gc_handle):
raise KeyboardInterrupt
_rclpy.rclpy_destroy_entity('guard_condition', sigint_gc)

# Mark all guards as triggered before yielding since they're auto-taken
for gc in [g for g in guards if wait_set.is_ready(g.guard_pointer)]:
gc._executor_triggered = True

# Process ready entities one node at a time
for node in nodes:
for tmr in [t for t in timers if wait_set.is_ready(t.timer_pointer)]:
# Check that a timer is ready to workaround rcl issue with cancelled timers
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr == timeout_timer:
continue
elif tmr.callback_group.can_execute(tmr):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
yielded_work = True
yield handler, tmr, node

for sub in [s for s in subscriptions if wait_set.is_ready(
s.subscription_pointer)]:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in [g for g in node.guards if g._executor_triggered]:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, tmr, node

for sub in [s for s in subscriptions if wait_set.is_ready(s.subscription_pointer)]:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in [g for g in node.guards if g._executor_triggered]:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

for client in [c for c in clients if wait_set.is_ready(c.client_pointer)]:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

for srv in [s for s in services if wait_set.is_ready(s.service_pointer)]:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, srv, node

# Check timeout timer
if (timeout_nsec == 0 or
(timeout_timer is not None and wait_set.is_ready(
timeout_timer.timer_pointer))):
break
yield handler, gc, node

for client in [c for c in clients if wait_set.is_ready(c.client_pointer)]:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

for srv in [s for s in services if wait_set.is_ready(s.service_pointer)]:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, srv, node

# Check timeout timer
if (timeout_nsec == 0 or
(timeout_timer is not None and wait_set.is_ready(
timeout_timer.timer_pointer))):
break


class SingleThreadedExecutor(Executor):
Expand Down
12 changes: 11 additions & 1 deletion rclpy/rclpy/wait_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,19 @@ def __init__(self):
# rcl_wait is not thread safe, so prevent multiple wait calls at once
self._wait_lock = threading.Lock()

def __del__(self):
def destroy(self):
if self._wait_set is not None:
_rclpy.rclpy_destroy_wait_set(self._wait_set)
self._wait_set = None

def __del__(self):
self.destroy()

def __enter__(self):
return self

def __exit__(self, t, v, tb):
self.destroy()

def add_subscription(self, subscription_handle, subscription_pointer):
self._subscriptions[subscription_pointer] = subscription_handle
Expand Down

0 comments on commit d8521cd

Please sign in to comment.