Skip to content

Commit

Permalink
Move entity specific work to ExecutorHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
sloretz committed Nov 17, 2017
1 parent c74c44a commit 0f0e20a
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 84 deletions.
17 changes: 15 additions & 2 deletions rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import threading

from rclpy.executor_handle import ExecutorHandle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.impl.implementation_singleton import rclpy_wait_set_implementation as _rclpy_wait_set
import rclpy.utilities
Expand Down Expand Up @@ -60,8 +61,20 @@ def __init__(
self.sequence_number = 0
self.response = None
self.callback_group = callback_group
# True when the callback is ready to fire but has not been "taken" by an executor
self._executor_event = False
# Holds info the executor uses to do work for this entity
self._executor_handle = ExecutorHandle(self._take, self._execute)

def _take(self):
response = _rclpy.rclpy_take_response(
self.client_handle, self.srv_type.Response, self.sequence_number)
return response

def _execute(self, response):
if response:
# clients spawn their own thread to wait for a response in the
# wait_for_future function. Users can either use this mechanism or monitor
# the content of client.response to check if a response has been received
self.response = response

def call(self, req):
self.response = None
Expand Down
68 changes: 68 additions & 0 deletions rclpy/rclpy/executor_handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import queue

import rclpy.future
from rclpy.utilities import timeout_sec_to_nsec


class ExecutorHandle:
"""Interface between an entity and an executor."""

def __init__(self, take_callback, execute_callback, cancel_ready_callback=None):
self._take_from_wait_list = take_callback
self.execute_callback = execute_callback
self._cancel_ready_callback = cancel_ready_callback
# Tasks the executor should schedule
self._tasks = queue.Queue()
# True when the callback is ready to fire but has not been "taken" by an executor
self._ready = False

def notify_ready(self):
"""Receive notification from executor that this entity was ready in the wait list."""
self._ready = True

def cancel_ready(self):
"""Receive notification from executor that this entity could not be taken."""
self._ready = False
# Hook for guard conditions to retrigger themselves
if self._cancel_ready_callback:
self._cancel_ready_callback

def ready(self):
return self._ready

def take_from_wait_list(self):
"""Get data from rcl."""
args = self._take_from_wait_list()
self._ready = False
return args

def schedule_task(self, handler, timeout_sec=None, on_timeout=None):
"""
Schedule a task on an executor that expires.
:param handler: callable or coroutine to run. Return value is the task result
:param timeout_sec: How long to wait before calling the on_timeout handler
:param on_timeout: callable or coroutine to run on timeout
:returns: a future for the task
:rtype: rclpy.future.Future
"""
# Add a task to be consumed by the executor
task = rclpy.future.Task(handler)
future = rclpy.future.Future(task)
timeout_nsec = timeout_sec_to_nsec(timeout_sec)
self._tasks.put((handler, timeout_nsec, on_timeout))
return future
86 changes: 12 additions & 74 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor
import inspect
import multiprocessing
from threading import Condition
from threading import Lock
Expand Down Expand Up @@ -130,92 +129,33 @@ def spin_once(self, timeout_sec=None):
"""
raise NotImplementedError()

def _take_timer(self, tmr):
_rclpy.rclpy_call_timer(tmr.timer_handle)

def _execute_timer(self, tmr, _):
return tmr.callback()

def _take_subscription(self, sub):
msg = _rclpy.rclpy_take(sub.subscription_handle, sub.msg_type)
return msg

def _execute_subscription(self, sub, msg):
if msg:
return sub.callback(msg)

def _take_client(self, client):
response = _rclpy.rclpy_take_response(
client.client_handle, client.srv_type.Response, client.sequence_number)
return response

def _execute_client(self, client, response):
if response:
# clients spawn their own thread to wait for a response in the
# wait_for_future function. Users can either use this mechanism or monitor
# the content of client.response to check if a response has been received
client.response = response

def _take_service(self, srv):
request_and_header = _rclpy.rclpy_take_request(
srv.service_handle, srv.srv_type.Request)
return request_and_header

def _execute_service(self, srv, request_and_header):
if request_and_header is None:
return
(request, header) = request_and_header
if request:

async def exec(request, header):
"""Enable service callbacks to be coroutines."""
response = srv.callback(request, srv.srv_type.Response())
if inspect.isawaitable(response):
response = await response
srv.send_response(response, header)

return exec(request, header)

def _take_guard_condition(self, gc):
pass

def _execute_guard_condition(self, gc, _):
return gc.callback()

def _make_task(self, entity, take_from_wait_list, call_callback):
def _make_task(self, entity):
"""
Make a task that performs work on an entity.
:param entity: An entity to wait on
:param take_from_wait_list: Makes the entity to stop appearing in the wait list
:type take_from_wait_list: callable
:param call_callback: Does the work the entity is ready for
:type call_callback: callable
:rtype: callable
:rtype: rclpy.future.Task
"""
gc = self._guard_condition
is_shutdown = self._is_shutdown
# Mark this so it doesn't get added back to the wait list
entity._executor_event = True
entity._executor_handle.notify_ready()

def execute():
nonlocal call_callback
nonlocal entity
nonlocal gc
nonlocal is_shutdown
nonlocal take_from_wait_list
if is_shutdown or not entity.callback_group.beginning_execution(entity):
# Didn't get the callback, or the executor has been ordered to stop
entity._executor_event = False
entity._executor_handle.cancel_ready()
gc.trigger()
return

# Signal that this has been 'taken' and can be added back to the wait list
arg = take_from_wait_list(entity)
entity._executor_event = False
arg = entity._executor_handle.take_from_wait_list()
gc.trigger()
# Return result of callback (Task will do the right thing if this is a coroutine)
return call_callback(entity, arg)
return entity._executor_handle.execute_callback(arg)

def postExecute():
nonlocal entity
Expand Down Expand Up @@ -248,29 +188,27 @@ def _process_new_callbacks(self, nodes, wait_set):
# TODO(Sloretz) Which rcl cancelled timer bug does this workaround?
if not _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
continue
task = self._make_task(tmr, self._take_timer, self._execute_timer)
task = self._make_task(tmr)
self._tasks.append((task, tmr, node))

for sub in node.subscriptions:
if wait_set.is_ready(sub) and sub.callback_group.can_execute(sub):
task = self._make_task(
sub, self._take_subscription, self._execute_subscription)
task = self._make_task(sub)
self._tasks.append((task, sub, node))

for gc in node.guards:
if wait_set.is_ready(gc) and gc.callback_group.can_execute(gc):
task = self._make_task(
gc, self._take_guard_condition, self._execute_guard_condition)
task = self._make_task(gc)
self._tasks.append((task, gc, node))

for cli in node.clients:
if wait_set.is_ready(cli) and cli.callback_group.can_execute(cli):
task = self._make_task(cli, self._take_client, self._execute_client)
task = self._make_task(cli)
self._tasks.append((task, cli, node))

for srv in node.services:
if wait_set.is_ready(srv) and srv.callback_group.can_execute(srv):
task = self._make_task(srv, self._take_service, self._execute_service)
task = self._make_task(srv)
self._tasks.append((task, srv, node))

def can_execute(self, entity):
Expand All @@ -281,7 +219,7 @@ def can_execute(self, entity):
:returns: True if the entity callback can be executed
:rtype: bool
"""
return not entity._executor_event and entity.callback_group.can_execute(entity)
return not entity._executor_handle.ready() and entity.callback_group.can_execute(entity)

def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
"""
Expand Down
16 changes: 14 additions & 2 deletions rclpy/rclpy/guard_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from rclpy.executor_handle import ExecutorHandle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


Expand All @@ -21,8 +22,19 @@ def __init__(self, callback, callback_group):
self.guard_handle, self.guard_pointer = _rclpy.rclpy_create_guard_condition()
self.callback = callback
self.callback_group = callback_group
# True when the callback is ready to fire but has not been "taken" by an executor
self._executor_event = False
# Holds info the executor uses to do work for this entity
self._executor_handle = ExecutorHandle(
self._take, self._execute, cancel_ready_callback=self._cancel_ready)

def _take(self):
pass

def _execute(self, _):
return self.callback()

def _cancel_ready(self):
# guard conditions are auto-taken when they come up in the wait-list, so retrigger
self.trigger()

def trigger(self):
_rclpy.rclpy_trigger_guard_condition(self.guard_handle)
29 changes: 27 additions & 2 deletions rclpy/rclpy/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect

from rclpy.executor_handle import ExecutorHandle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


Expand All @@ -26,9 +29,31 @@ def __init__(
self.srv_name = srv_name
self.callback = callback
self.callback_group = callback_group
# True when the callback is ready to fire but has not been "taken" by an executor
self._executor_event = False
# Holds info the executor uses to do work for this entity
self._executor_handle = ExecutorHandle(self._take, self._execute)
self.qos_profile = qos_profile

def _take(self):
request_and_header = _rclpy.rclpy_take_request(self.service_handle, self.srv_type.Request)
return request_and_header

def _execute(self, request_and_header):
if request_and_header is None:
return
(request, header) = request_and_header
if request:

async def exec_service(request, header):
"""Enable service callbacks to be coroutines."""
nonlocal self
response = self.callback(request, self.srv_type.Response())
if inspect.isawaitable(response):
response = await response
# This special method is needed to make sure send_response gets called after the
# user's callback even if it yields
self.send_response(response, header)

return exec_service(request, header)

def send_response(self, response, header):
_rclpy.rclpy_send_response(self.service_handle, response, header)
15 changes: 13 additions & 2 deletions rclpy/rclpy/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from rclpy.executor_handle import ExecutorHandle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


class Subscription:

Expand All @@ -25,6 +28,14 @@ def __init__(
self.topic = topic
self.callback = callback
self.callback_group = callback_group
# True when the callback is ready to fire but has not been "taken" by an executor
self._executor_event = False
# Holds info the executor uses to do work for this entity
self._executor_handle = ExecutorHandle(self._take, self._execute)
self.qos_profile = qos_profile

def _take(self):
msg = _rclpy.rclpy_take(self.subscription_handle, self.msg_type)
return msg

def _execute(self, msg):
if msg:
return self.callback(msg)
11 changes: 9 additions & 2 deletions rclpy/rclpy/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from rclpy.executor_handle import ExecutorHandle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


Expand All @@ -23,8 +24,14 @@ def __init__(self, callback, callback_group, timer_period_ns):
self.timer_period_ns = timer_period_ns
self.callback = callback
self.callback_group = callback_group
# True when the callback is ready to fire but has not been "taken" by an executor
self._executor_event = False
# Holds info the executor uses to do work for this entity
self._executor_handle = ExecutorHandle(self._take, self._execute)

def _take(self):
_rclpy.rclpy_call_timer(self.timer_handle)

def _execute(self, _):
return self.callback()

@property
def timer_period_ns(self):
Expand Down

0 comments on commit 0f0e20a

Please sign in to comment.