Skip to content

Commit

Permalink
Futures and coroutines (#166)
Browse files Browse the repository at this point in the history
* Executor supports Futures and Tasks

Adds Task and Future classes based on asyncio
Callbacks can be coroutines, the executor runs them to completion
  • Loading branch information
sloretz authored Jan 17, 2018
1 parent ff5fbe1 commit ddc33ad
Show file tree
Hide file tree
Showing 4 changed files with 681 additions and 27 deletions.
96 changes: 69 additions & 27 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor
import inspect
import multiprocessing
from threading import Condition
from threading import Lock

from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.task import Task
from rclpy.timer import WallTimer
from rclpy.utilities import ok
from rclpy.utilities import timeout_sec_to_nsec
Expand Down Expand Up @@ -71,6 +73,16 @@ def wait(self, timeout_sec=None):
return True


async def await_or_execute(callback, *args):
"""Await a callback if it is a coroutine, else execute it."""
if inspect.iscoroutinefunction(callback):
# Await a coroutine
return await callback(*args)
else:
# Call a normal function
return callback(*args)


class TimeoutException(Exception):
"""Signal that a timeout occurred."""

Expand All @@ -93,6 +105,9 @@ def __init__(self):
super().__init__()
self._nodes = set()
self._nodes_lock = Lock()
# Tasks to be executed (oldest first) 3-tuple Task, Entity, Node
self._tasks = []
self._tasks_lock = Lock()
# This is triggered when wait_for_ready_callbacks should rebuild the wait list
gc, gc_handle = _rclpy.rclpy_create_guard_condition()
self._guard_condition = gc
Expand All @@ -105,6 +120,22 @@ def __init__(self):
self._last_args = None
self._last_kwargs = None

def create_task(self, callback, *args, **kwargs):
"""
Add a callback or coroutine to be executed during :meth:`spin` and return a Future.
Arguments to this function are passed to the callback.
:param callback: A callback to be run in the executor
:type callback: callable or coroutine function
:rtype: :class:`rclpy.task.Future` instance
"""
task = Task(callback, args, kwargs, executor=self)
with self._tasks_lock:
self._tasks.append((task, None, None))
# Task inherits from Future
return task

def shutdown(self, timeout_sec=None):
"""
Stop executing callbacks and wait for their completion.
Expand Down Expand Up @@ -175,23 +206,23 @@ def spin_once(self, timeout_sec=None):
def _take_timer(self, tmr):
_rclpy.rclpy_call_timer(tmr.timer_handle)

def _execute_timer(self, tmr, _):
tmr.callback()
async def _execute_timer(self, tmr, _):
await await_or_execute(tmr.callback)

def _take_subscription(self, sub):
msg = _rclpy.rclpy_take(sub.subscription_handle, sub.msg_type)
return msg

def _execute_subscription(self, sub, msg):
async def _execute_subscription(self, sub, msg):
if msg:
sub.callback(msg)
await await_or_execute(sub.callback, msg)

def _take_client(self, client):
response = _rclpy.rclpy_take_response(
client.client_handle, client.srv_type.Response, client.sequence_number)
return response

def _execute_client(self, client, response):
async def _execute_client(self, client, response):
if response:
# clients spawn their own thread to wait for a response in the
# wait_for_future function. Users can either use this mechanism or monitor
Expand All @@ -203,42 +234,35 @@ def _take_service(self, srv):
srv.service_handle, srv.srv_type.Request)
return request_and_header

def _execute_service(self, srv, request_and_header):
async def _execute_service(self, srv, request_and_header):
if request_and_header is None:
return
(request, header) = request_and_header
if request:
response = srv.callback(request, srv.srv_type.Response())
response = await await_or_execute(srv.callback, request, srv.srv_type.Response())
srv.send_response(response, header)

def _take_guard_condition(self, gc):
gc._executor_triggered = False

def _execute_guard_condition(self, gc, _):
gc.callback()
async def _execute_guard_condition(self, gc, _):
await await_or_execute(gc.callback)

def _make_handler(self, entity, take_from_wait_list, call_callback):
def _make_handler(self, entity, node, take_from_wait_list, call_coroutine):
"""
Make a handler that performs work on an entity.
:param entity: An entity to wait on
:param take_from_wait_list: Makes the entity to stop appearing in the wait list
:type take_from_wait_list: callable
:param call_callback: Does the work the entity is ready for
:type call_callback: callable
:param call_coroutine: Does the work the entity is ready for
:type call_coroutine: coroutine function
:rtype: callable
"""
gc = self._guard_condition
work_tracker = self._work_tracker
is_shutdown = self._is_shutdown
# Mark this so it doesn't get added back to the wait list
entity._executor_event = True

def handler():
nonlocal entity
nonlocal gc
nonlocal is_shutdown
nonlocal work_tracker
async def handler(entity, gc, is_shutdown, work_tracker):
if is_shutdown or not entity.callback_group.beginning_execution(entity):
# Didn't get the callback, or the executor has been ordered to stop
entity._executor_event = False
Expand All @@ -252,13 +276,18 @@ def handler():
_rclpy.rclpy_trigger_guard_condition(gc)

try:
call_callback(entity, arg)
await call_coroutine(entity, arg)
finally:
entity.callback_group.ending_execution(entity)
# Signal that work has been done so the next callback in a mutually exclusive
# callback group can get executed
_rclpy.rclpy_trigger_guard_condition(gc)
return handler
task = Task(
handler, (entity, self._guard_condition, self._is_shutdown, self._work_tracker),
executor=self)
with self._tasks_lock:
self._tasks.append((task, entity, node))
return task

def can_execute(self, entity):
"""
Expand Down Expand Up @@ -290,6 +319,18 @@ def _wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
if nodes is None:
nodes = self.get_nodes()

# Yield tasks in-progress before waiting for new work
tasks = None
with self._tasks_lock:
tasks = list(self._tasks)
if tasks:
for task, entity, node in reversed(tasks):
if not task.executing() and not task.done() and (node is None or node in nodes):
yield task, entity, node
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))

yielded_work = False
while not yielded_work and not self._is_shutdown:
# Gather entities that can be waited on
Expand Down Expand Up @@ -368,39 +409,40 @@ def _wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr.callback_group.can_execute(tmr):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
tmr, node, self._take_timer, self._execute_timer)
yielded_work = True
yield handler, tmr, node

for sub in node.subscriptions:
if sub.subscription_pointer in subs_ready:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
sub, node, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in node.guards:
if gc._executor_triggered:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
gc, node, self._take_guard_condition,
self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

for client in node.clients:
if client.client_pointer in clients_ready:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
client, node, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

for srv in node.services:
if srv.service_pointer in services_ready:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
srv, self._take_service, self._execute_service)
srv, node, self._take_service, self._execute_service)
yielded_work = True
yield handler, srv, node

Expand Down
Loading

0 comments on commit ddc33ad

Please sign in to comment.