From 7963c1d098fc17a5c2982b80318876439dd95d54 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Wed, 20 Mar 2019 12:46:04 -0300 Subject: [PATCH 1/3] Add run_async coroutine to LaunchService. Signed-off-by: Michel Hidalgo --- launch/launch/launch_service.py | 369 ++++++++++--------- launch/launch/utilities/signal_management.py | 16 +- launch/test/launch/test_launch_service.py | 40 +- 3 files changed, 230 insertions(+), 195 deletions(-) diff --git a/launch/launch/launch_service.py b/launch/launch/launch_service.py index 463adad6f..eaaa087a9 100644 --- a/launch/launch/launch_service.py +++ b/launch/launch/launch_service.py @@ -17,11 +17,13 @@ import asyncio import atexit import collections.abc +import contextlib import logging import os import signal import threading import traceback +from typing import Coroutine from typing import Iterable from typing import List # noqa: F401 from typing import Optional @@ -91,15 +93,15 @@ def __init__( level=logging.DEBUG if debug else logging.INFO ) self.__debug = debug + self.__argv = argv if argv is not None else [] # Setup logging self.__logger = launch.logging.get_logger('launch') + # Install signal handlers if not already installed, will raise if not # in main-thread, call manually in main-thread to avoid this. install_signal_handlers() - self.__argv = argv if argv is not None else [] - # Setup context and register a built-in event handler for bootstrapping. self.__context = LaunchContext(argv=self.__argv) self.__context.register_event_handler(OnIncludeLaunchDescription()) @@ -109,10 +111,6 @@ def __init__( self._entity_future_pairs = \ [] # type: List[Tuple[LaunchDescriptionEntity, asyncio.Future]] - # Used to prevent run() being called from multiple threads. - self.__running_lock = threading.Lock() - self.__running = False - # Used to allow asynchronous use of self.__loop_from_run_thread without # it being set to None by run() as it exits. self.__loop_from_run_thread_lock = threading.RLock() @@ -176,6 +174,116 @@ def _is_idle(self): number_of_entity_future_pairs += self._prune_and_count_context_completion_futures() return number_of_entity_future_pairs == 0 and self.__context._event_queue.empty() + @contextlib.contextmanager + def _prepare_run_loop(self, loop): + try: + # Acquire the lock and initialize the loop. + with self.__loop_from_run_thread_lock: + if self.__loop_from_run_thread is not None: + raise RuntimeError( + 'LaunchService cannot be run multiple times concurrently.' + ) + this_loop = asyncio.get_event_loop() if loop is None else loop + + global _g_loops_used + _g_loops_used.add(this_loop) + + if self.__debug: + this_loop.set_debug(True) + + # Set the asyncio loop for the context. + self.__context._set_asyncio_loop(this_loop) + # Recreate the event queue to ensure the same event loop is being used. + new_queue = asyncio.Queue(loop=this_loop) + while True: + try: + new_queue.put_nowait(self.__context._event_queue.get_nowait()) + except asyncio.QueueEmpty: + break + self.__context._event_queue = new_queue + + self.__loop_from_run_thread = this_loop + + # Get current task. + this_task = asyncio.Task.current_task(this_loop) + + # Setup custom signal handlers for SIGINT, SIGTERM and maybe SIGQUIT. + sigint_received = False + + def _on_sigint(signum, frame, prev_handler): + # Ignore additional signals until we finish processing this one. + current_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) + if current_handler is signal.SIG_IGN: + # This function has been called re-entrantly. + return + nonlocal sigint_received + base_msg = 'user interrupted with ctrl-c (SIGINT)' + if not sigint_received: + self.__logger.warning(base_msg) + ret = self._shutdown( + reason='ctrl-c (SIGINT)', due_to_sigint=True, force_sync=True + ) + assert ret is None, ret + sigint_received = True + else: + self.__logger.warning('{} again, ignoring...'.format(base_msg)) + if callable(prev_handler): + try: + # Run pre-existing signal handler. + prev_handler(signum, frame) + except KeyboardInterrupt: + # Ignore exception. + pass + # Restore current signal handler (not necessarily this one). + signal.signal(signal.SIGINT, current_handler) + + on_sigint(_on_sigint) + + def _on_sigterm(signum, frame, prev_handler): + # Ignore additional signals until we finish processing this one. + current_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) + if current_handler is signal.SIG_IGN: + # This function has been called re-entrantly. + return + # TODO(wjwwood): try to terminate running subprocesses before exiting. + self.__logger.error('using SIGTERM or SIGQUIT can result in orphaned processes') + self.__logger.error('make sure no processes launched are still running') + this_loop.call_soon(this_task.cancel) + if callable(prev_handler): + # Run pre-existing signal handler. + prev_handler(signum, frame) + # Restore current signal handler (not necessarily this one). + signal.signal(signal.SIGTERM, current_handler) + + on_sigterm(_on_sigterm) + + def _on_sigquit(signum, frame, prev_handler): + # Ignore additional signals until we finish processing this one. + current_handler = signal.signal(signal.SIGQUIT, signal.SIG_IGN) + if current_handler is signal.SIG_IGN: + # This function has been called re-entrantly. + return + self.__logger.error('user interrupted with ctrl-\\ (SIGQUIT), terminating...') + _on_sigterm(signum, frame, prev_handler) + # Restore current signal handler (not necessarily this one). + signal.signal(signal.SIGQUIT, current_handler) + + on_sigquit(_on_sigquit) + + # Yield asyncio loop and current task. + yield self.__loop_from_run_thread, this_task + finally: + # Unset the signal handlers while not running. + on_sigint(None) + on_sigterm(None) + on_sigquit(None) + + # No matter what happens, unset the loop. + with self.__loop_from_run_thread_lock: + self.__context._set_asyncio_loop(None) + self.__loop_from_run_thread = None + self.__shutting_down = False + async def _process_one_event(self) -> None: next_event = await self.__context._event_queue.get() await self.__process_event(next_event) @@ -207,187 +315,112 @@ async def __process_event(self, event: Event) -> None: # 'launch.LaunchService', # "processing event: '{}' x '{}'".format(event, event_handler)) - async def __run_loop(self) -> None: - while True: - # Check if we're idle, i.e. no on-going entities (actions) or events in the queue - is_idle = self._is_idle() # self._entity_future_pairs is pruned here - if not self.__shutting_down and self.__shutdown_when_idle and is_idle: - coroutine = self._shutdown(reason='idle', due_to_sigint=False) - await coroutine - continue - - if self.__loop_from_run_thread is None: - raise RuntimeError('__loop_from_run_thread unexpectedly None') - process_one_event_task = self.__loop_from_run_thread.create_task( - self._process_one_event()) - if self.__shutting_down: - # If shutting down and idle then we're done. - if is_idle: - process_one_event_task.cancel() - return - else: - entity_futures = [pair[1] for pair in self._entity_future_pairs] - entity_futures.append(process_one_event_task) - entity_futures.extend(self.__context._completion_futures) - done = set() # type: Set[asyncio.Future] - while not done: - done, pending = await asyncio.wait( - entity_futures, - loop=self.__loop_from_run_thread, - timeout=1.0, - return_when=asyncio.FIRST_COMPLETED) - if not done: - self.__logger.debug( - 'still waiting on futures: {}'.format(entity_futures) - ) - else: - await process_one_event_task - - def run(self, *, shutdown_when_idle=True) -> int: + async def run_async(self, *, shutdown_when_idle=True, loop=None) -> int: """ - Start the event loop and visit all entities of all included LaunchDescriptions. - - This should only ever be run from a single thread. + Visit all entities of all included LaunchDescription instances asynchronously. - :param: shutdown_when_idle if True (default), the service will shutdown when idle - """ - # Make sure this has not been called in multiple threads. - with self.__running_lock: - if self.__running: - raise RuntimeError( - 'LaunchService.run() called from multiple threads concurrently.') - self.__running = True + This should only ever be run from the main thread and not concurrently with other + asynchronous runs. - self.__return_code = 0 # reset the return_code for this run() - self.__shutdown_when_idle = shutdown_when_idle + Note that custom signal handlers are set, and KeyboardInterrupt is caught and ignored + around the original signal handler. After the run ends, this behavior is undone. - # Acquire the lock and initialize the asyncio loop. - with self.__loop_from_run_thread_lock: - self.__loop_from_run_thread = osrf_pycommon.process_utils.get_loop() - if self.__loop_from_run_thread is None: - raise RuntimeError('__loop_from_run_thread unexpectedly None') - global _g_loops_used - _g_loops_used.add(self.__loop_from_run_thread) - if self.__debug: - self.__loop_from_run_thread.set_debug(True) + :param: shutdown_when_idle if True (default), the service will shutdown when idle. + """ + # Make sure this has not been called from any thread but the main thread. + if threading.current_thread() is not threading.main_thread(): + raise RuntimeError( + 'LaunchService can only be run in the main thread.' + ) + + return_code = 0 + with self._prepare_run_loop(loop) as (this_loop, this_task): + # Log logging configuration details. + launch.logging.log_launch_config(logger=self.__logger) # Setup the exception handler to make sure we return non-0 when there are errors. - def exception_handler(loop, context): - self.__return_code = 1 + def _on_exception(loop, context): + nonlocal return_code + return_code = 1 return loop.default_exception_handler(context) - self.__loop_from_run_thread.set_exception_handler(exception_handler) + this_loop.set_exception_handler(_on_exception) - # Set the asyncio loop for the context. - self.__context._set_asyncio_loop(self.__loop_from_run_thread) - # Recreate the event queue to ensure the same event loop is being used. - new_queue = asyncio.Queue(loop=self.__loop_from_run_thread) while True: try: - new_queue.put_nowait(self.__context._event_queue.get_nowait()) - except asyncio.QueueEmpty: - break - self.__context._event_queue = new_queue - - # Run the asyncio loop over the main coroutine that processes events. - try: - sigint_received = False - run_loop_task = self.__loop_from_run_thread.create_task(self.__run_loop()) - - # Log logging configuration details. - launch.logging.log_launch_config(logger=self.__logger) - - # Setup custom signal hanlders for SIGINT, SIGQUIT, and SIGTERM. - def _on_sigint(signum, frame): - # Ignore additional interrupts until we finish processing this one. - prev_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) - if prev_handler is signal.SIG_IGN: - # This function has been called re-entrantly. - return - - nonlocal sigint_received - base_msg = 'user interrupted with ctrl-c (SIGINT)' - if not sigint_received: - self.__logger.warning(base_msg) - ret = self._shutdown( - reason='ctrl-c (SIGINT)', due_to_sigint=True, force_sync=True) - assert ret is None, ret - sigint_received = True - else: - - self.__logger.warning('{} again, ignoring...'.format(base_msg)) - - signal.signal(signal.SIGINT, prev_handler) - - def _on_sigterm(signum, frame): - # Ignore additional signals until we finish processing this one. - prev_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) - if prev_handler is signal.SIG_IGN: - # This function has been called re-entrantly. - return - - # TODO(wjwwood): try to terminate running subprocesses before exiting. - self.__logger.error('using SIGTERM or SIGQUIT can result in orphaned processes') - self.__logger.error('make sure no processes launched are still running') - nonlocal run_loop_task - self.__loop_from_run_thread.call_soon_threadsafe(run_loop_task.cancel) - - signal.signal(signal.SIGTERM, prev_handler) - - def _on_sigquit(signum, frame): - # Ignore additional signals until we finish processing this one. - prev_handler = signal.signal(signal.SIGQUIT, signal.SIG_IGN) - if prev_handler is signal.SIG_IGN: - # This function has been called re-entrantly. - return - - self.__logger.error('user interrupted with ctrl-\\ (SIGQUIT), terminating...') - _on_sigterm(signum, frame) - - signal.signal(signal.SIGQUIT, prev_handler) - - on_sigint(_on_sigint) - on_sigterm(_on_sigterm) - on_sigquit(_on_sigquit) - - while not run_loop_task.done(): - try: - self.__loop_from_run_thread.run_until_complete(run_loop_task) + # Check if we're idle, i.e. no on-going entities (actions) or events in + # the queue + is_idle = self._is_idle() # self._entity_future_pairs is pruned here + if not self.__shutting_down and shutdown_when_idle and is_idle: + ret = await self._shutdown(reason='idle', due_to_sigint=False) + assert ret is None, ret + continue + process_one_event_task = this_loop.create_task(self._process_one_event()) + if self.__shutting_down: + # If shutting down and idle then we're done. + if is_idle: + process_one_event_task.cancel() + break + entity_futures = [pair[1] for pair in self._entity_future_pairs] + entity_futures.append(process_one_event_task) + entity_futures.extend(self.__context._completion_futures) + done = set() # type: Set[asyncio.Future] + while not done: + done, pending = await asyncio.wait( + entity_futures, + loop=this_loop, + timeout=1.0, + return_when=asyncio.FIRST_COMPLETED + ) + if not done: + self.__logger.debug( + 'still waiting on futures: {}'.format(entity_futures) + ) + else: + await process_one_event_task except KeyboardInterrupt: - pass + continue except asyncio.CancelledError: - self.__logger.error('asyncio run loop was canceled') + self.__logger.error('run task was canceled') + return_code = 1 + break except Exception as exc: msg = 'Caught exception in launch (see debug for traceback): {}'.format(exc) self.__logger.debug(traceback.format_exc()) self.__logger.error(msg) - self.__return_code = 1 - ret = self._shutdown(reason=msg, due_to_sigint=False, force_sync=True) + ret = await self._shutdown(reason=msg, due_to_sigint=False) assert ret is None, ret - # restart run loop to let it shutdown properly - run_loop_task = self.__loop_from_run_thread.create_task(self.__run_loop()) - finally: - # No matter what happens, unset the loop and set running to false. - with self.__loop_from_run_thread_lock: - self.__shutting_down = False - self.__loop_from_run_thread = None - self.__context._set_asyncio_loop(None) + return_code = 1 + # keep running to let things shutdown properly + continue + return return_code - # Unset the signal handlers while not running. - on_sigint(None) - on_sigterm(None) - on_sigquit(None) + def run(self, *, shutdown_when_idle=True) -> int: + """ + Run an event loop and visit all entities of all included LaunchDescription instances. - with self.__running_lock: - self.__running = False + This should only ever be run from the main thread and not concurrently with + asynchronous runs (see `run_async()` documentation). - return self.__return_code + :param: shutdown_when_idle if True (default), the service will shutdown when idle + """ + # Make sure this has not been called from any thread but the main thread. + if threading.current_thread() is not threading.main_thread(): + raise RuntimeError( + 'LaunchService can only be run in the main thread.' + ) + loop = osrf_pycommon.process_utils.get_loop() + run_async_task = loop.create_task(self.run_async( + shutdown_when_idle=shutdown_when_idle, loop=loop + )) + loop.run_until_complete(run_async_task) + return run_async_task.result() def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]: self.__shutting_down = True self.__context._set_is_shutdown(True) return None - def _shutdown(self, *, reason, due_to_sigint, force_sync=False): + def _shutdown(self, *, reason, due_to_sigint, force_sync=False) -> Optional[Coroutine]: # Assumption is that this method is only called when running. retval = None if not self.__shutting_down: @@ -411,20 +444,24 @@ def _shutdown(self, *, reason, due_to_sigint, force_sync=False): self.__context._set_is_shutdown(True) return retval - def shutdown(self) -> None: + def shutdown(self, force_sync=False) -> Optional[Coroutine]: """ Shutdown all on-going activities and then stop the asyncio run loop. - This will cause LaunchService.run() to eventually exit. + This will cause the running LaunchService to eventually exit. + + Does nothing if the LaunchService is not running. - Does nothing if LaunchService.run() is not running in another thread. + This will return an awaitable coroutine if called from within the loop. This method is thread-safe. """ with self.__loop_from_run_thread_lock: if self.__loop_from_run_thread is not None: - ret = self._shutdown(reason='LaunchService.shutdown() called', due_to_sigint=False) - assert ret is None, ret + return self._shutdown( + reason='LaunchService.shutdown() called', + due_to_sigint=False, force_sync=force_sync + ) @property def context(self): diff --git a/launch/launch/utilities/signal_management.py b/launch/launch/utilities/signal_management.py index fd2a759fc..333e3bc88 100644 --- a/launch/launch/utilities/signal_management.py +++ b/launch/launch/utilities/signal_management.py @@ -32,10 +32,6 @@ def on_sigint(handler): Set the signal handler to be called on SIGINT. Pass None for no custom handler. - Note that if a custom handler is set (anything other than None is given), - then KeyboardInterrupt is caught and ignored around the original signal - handler (the once captured when install_signal_handlers() was called). - Passing None for handler will undo this behavior. install_signal_handlers() must have been called in the main thread before. It is called automatically by the constructor of `launch.LaunchService`. @@ -114,8 +110,8 @@ def install_signal_handlers(): def __on_sigint(signum, frame): if callable(__custom_sigint_handler): - __custom_sigint_handler(signum, frame) - if callable(__original_sigint_handler): + __custom_sigint_handler(signum, frame, __original_sigint_handler) + elif callable(__original_sigint_handler): __original_sigint_handler(signum, frame) if platform.system() != 'Windows': @@ -124,14 +120,14 @@ def __on_sigint(signum, frame): def __on_sigquit(signum, frame): if callable(__custom_sigquit_handler): - __custom_sigquit_handler(signum, frame) - if callable(__original_sigquit_handler): + __custom_sigquit_handler(signum, frame, __original_sigquit_handler) + elif callable(__original_sigquit_handler): __original_sigquit_handler(signum, frame) def __on_sigterm(signum, frame): if callable(__custom_sigterm_handler): - __custom_sigterm_handler(signum, frame) - if callable(__original_sigterm_handler): + __custom_sigterm_handler(signum, frame, __original_sigterm_handler) + elif callable(__original_sigterm_handler): __original_sigterm_handler(signum, frame) # signals must be registered in the main thread, but print a nicer message if we're not there diff --git a/launch/test/launch/test_launch_service.py b/launch/test/launch/test_launch_service.py index 4c60c7c4f..6102ae791 100644 --- a/launch/test/launch/test_launch_service.py +++ b/launch/test/launch/test_launch_service.py @@ -14,21 +14,14 @@ """Tests for the LaunchService class.""" +import asyncio +import osrf_pycommon import queue import threading from launch import LaunchDescription from launch import LaunchService from launch.events import ExecutionComplete -from launch.utilities import install_signal_handlers - -# Install the signal handlers here, in the hope that this is executed in the -# main-thread. -# If this is not the main-thread, a ValueError will be raised. -# See the docs for this function for more info. -# This would ensure that the custom handlers that the LaunchService needs can -# be installed later from other threads. -install_signal_handlers() def test_launch_service_constructors(): @@ -72,19 +65,19 @@ class MockEvent: assert ls._LaunchService__context._event_queue.qsize() == 2 assert handled_events.qsize() == 0 - t = threading.Thread(target=ls.run, kwargs={'shutdown_when_idle': False}) - t.start() + def perform_test_sequence(): + # First event (after including description of event handler). + handled_events.get(block=True, timeout=5.0) + # Emit and then check for a second event. + ls.emit_event(MockEvent()) + handled_events.get(block=True, timeout=5.0) + # Shutdown (generates a third event) and join the thread. + ls.shutdown() - # First event (after including description of event handler). - handled_events.get(block=True, timeout=5.0) + threading.Thread(target=perform_test_sequence).start() - # Emit and then check for a second event. - ls.emit_event(MockEvent()) - handled_events.get(block=True, timeout=5.0) + assert ls.run(shutdown_when_idle=False) == 0 - # Shutdown (generates a third event) and join the thread. - ls.shutdown() - t.join() # Check that the shutdown event was handled. handled_events.get(block=False) @@ -93,4 +86,13 @@ class MockEvent: assert handled_events.qsize() == 0 assert ls.run(shutdown_when_idle=True) == 0 + # Check that the mock and shutdown events were handled. + assert handled_events.qsize() == 2 + handled_events.get(block=False) + handled_events.get(block=False) + + loop = osrf_pycommon.process_utils.get_loop() + assert loop.run_until_complete(ls.run_async(shutdown_when_idle=True)) == 0 + # Check that the shutdown events was handled. + assert handled_events.qsize() == 1 handled_events.get(block=False) From d18ddea6374c1fc479207685b33ac7efb5a4b67b Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Tue, 9 Apr 2019 15:37:17 -0300 Subject: [PATCH 2/3] Address peer review comments. Signed-off-by: Michel Hidalgo --- launch/test/launch/test_launch_service.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/launch/test/launch/test_launch_service.py b/launch/test/launch/test_launch_service.py index 6102ae791..e475ef75f 100644 --- a/launch/test/launch/test_launch_service.py +++ b/launch/test/launch/test_launch_service.py @@ -14,8 +14,6 @@ """Tests for the LaunchService class.""" -import asyncio -import osrf_pycommon import queue import threading @@ -23,6 +21,8 @@ from launch import LaunchService from launch.events import ExecutionComplete +import osrf_pycommon + def test_launch_service_constructors(): """Test the constructors for LaunchService class.""" @@ -65,19 +65,25 @@ class MockEvent: assert ls._LaunchService__context._event_queue.qsize() == 2 assert handled_events.qsize() == 0 + # Spin up a background thread for testing purposes. def perform_test_sequence(): # First event (after including description of event handler). handled_events.get(block=True, timeout=5.0) # Emit and then check for a second event. ls.emit_event(MockEvent()) handled_events.get(block=True, timeout=5.0) - # Shutdown (generates a third event) and join the thread. + # Shutdown (generates a third event). ls.shutdown() - threading.Thread(target=perform_test_sequence).start() + t = threading.Thread(target=perform_test_sequence) + t.start() + # Run the launch service. assert ls.run(shutdown_when_idle=False) == 0 + # Join background thread if still running. + t.join() + # Check that the shutdown event was handled. handled_events.get(block=False) From 9b16ec3c6b3214f307b162e297f64426341d842a Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Tue, 13 Aug 2019 14:43:05 -0300 Subject: [PATCH 3/3] Remove duplicate thread check. Signed-off-by: Michel Hidalgo --- launch/launch/launch_service.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/launch/launch/launch_service.py b/launch/launch/launch_service.py index eaaa087a9..86847af43 100644 --- a/launch/launch/launch_service.py +++ b/launch/launch/launch_service.py @@ -403,11 +403,6 @@ def run(self, *, shutdown_when_idle=True) -> int: :param: shutdown_when_idle if True (default), the service will shutdown when idle """ - # Make sure this has not been called from any thread but the main thread. - if threading.current_thread() is not threading.main_thread(): - raise RuntimeError( - 'LaunchService can only be run in the main thread.' - ) loop = osrf_pycommon.process_utils.get_loop() run_async_task = loop.create_task(self.run_async( shutdown_when_idle=shutdown_when_idle, loop=loop