Skip to content

Commit

Permalink
[Internal] Fix the bug that Ctrl+C doesn't returns flow result. (#3174)
Browse files Browse the repository at this point in the history
# Description

Fix the bug that Ctrl+C doesn't returns flow result because the signal
handler would raise an exception while asyncio.run cannot handle it
correctly.

Note that for py<3.11, there is no default sigint handler when using
asyncio.run to run an async function, so we add one for py<3.11
following python's implementation:
https://github.com/python/cpython/blob/46c808172fd3148e3397234b23674bf70734fb55/Lib/asyncio/runners.py#L150
Such logic is initialized in this PR:
python/cpython#32105

This pull request introduces significant changes to the
`src/promptflow-core/promptflow/_utils/async_utils.py` file, aimed at
improving the handling of asynchronous tasks and signal interruption.
The changes also include minor modifications to several other files in
the `src/promptflow-core/promptflow/` directory.

Here are the key changes:

### Signal Handling and Asynchronous Task Management:

*
[`src/promptflow-core/promptflow/_utils/async_utils.py`](diffhunk://#diff-ebca5e7d6a45f737be917f296f2fb0a3480fa26bc378bde80f11368c9ac2c4fbR7-R8):
A new class `_AsyncTaskSigIntHandler` was added to handle SIGINT signals
during the execution of asynchronous tasks. This class cancels the
current task if a SIGINT signal is received, which is particularly
useful for Python versions less than 3.11 where the default cancelling
behavior is not supported. The function
`_invoke_async_with_sigint_handler` was also added to invoke
asynchronous functions with the SIGINT handler. The function
`async_run_allowing_running_loop` was modified to use this new function.
[[1]](diffhunk://#diff-ebca5e7d6a45f737be917f296f2fb0a3480fa26bc378bde80f11368c9ac2c4fbR7-R8)
[[2]](diffhunk://#diff-ebca5e7d6a45f737be917f296f2fb0a3480fa26bc378bde80f11368c9ac2c4fbR27-R79)
[[3]](diffhunk://#diff-ebca5e7d6a45f737be917f296f2fb0a3480fa26bc378bde80f11368c9ac2c4fbL39-R94)

### Minor Changes in Other Files:

*
[`src/promptflow-core/promptflow/_utils/process_utils.py`](diffhunk://#diff-50bb354dd49c977ba18f8fa38059525658153f097ef6f808f4226cff4d7a8891L32-R32):
The signal handler for SIGINT was changed to
`signal.default_int_handler` in the `block_terminate_signal_to_parent`
function.
*
[`src/promptflow-core/promptflow/executor/_async_nodes_scheduler.py`](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901L9):
The import of the `signal` module was removed. In the `execute` method,
the signal handler registration was removed and the exception handling
was modified to include `asyncio.CancelledError`. The `signal_handler`
function was removed and replaced with the `cancel` method.
[[1]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901L9)
[[2]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901L49-R59)
[[3]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901R77-R81)
[[4]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901L174-L183)
*
[`src/promptflow-core/promptflow/executor/flow_executor.py`](diffhunk://#diff-bec06607cb28fd791b8ed11bb488979344ca342be5f1c67ba6dd663d5e12240fL1096-R1100):
In the `_exec_async` method, the exception handling was modified to
handle `asyncio.CancelledError` instead of `KeyboardInterrupt`.

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Heyi <heta@microsoft.com>
  • Loading branch information
thy09 and Heyi authored May 10, 2024
1 parent 6288dbc commit b3257cc
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 31 deletions.
58 changes: 57 additions & 1 deletion src/promptflow-core/promptflow/_utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import asyncio
import functools
import signal
import threading

from promptflow.tracing import ThreadPoolExecutorWithContext

Expand All @@ -22,6 +24,60 @@ def _has_running_loop() -> bool:
return False


class _AsyncTaskSigIntHandler:
"""The handler to cancel the current task if SIGINT is received.
This is only for python<3.11 where the default cancelling behavior is not supported.
The code is similar to the python>=3.11 builtin implementation.
https://github.com/python/cpython/blob/46c808172fd3148e3397234b23674bf70734fb55/Lib/asyncio/runners.py#L150
"""

def __init__(self, task: asyncio.Task, loop: asyncio.AbstractEventLoop):
self._loop = loop
self._task = task
self._interrupt_count = 0

def on_sigint(self, signum, frame):
self._interrupt_count += 1
if self._interrupt_count == 1 and not self._task.done():
self._task.cancel()
# This call_soon_threadsafe would schedule the call as soon as possible,
# it would force the event loop to wake up then handle the cancellation request.
# This is to avoid the loop blocking with long timeout.
self._loop.call_soon_threadsafe(lambda: None)
return
raise KeyboardInterrupt()


async def _invoke_async_with_sigint_handler(async_func, *args, **kwargs):
"""In python>=3.11, when sigint is hit,
asyncio.run in default cancel the running tasks before raising the KeyboardInterrupt,
this introduces the chance to handle the cancelled error.
So we have a similar implementation here so python<3.11 also have such feature.
https://github.com/python/cpython/blob/46c808172fd3148e3397234b23674bf70734fb55/Lib/asyncio/runners.py#L150
"""
# For the scenario that we don't need to update sigint, just return.
# The scenarios include:
# For python >= 3.11, asyncio.run already updated the sigint for cancelling tasks.
# The user already has his own customized sigint.
# The current code is not in main thread.
if not _should_update_sigint():
return await async_func(*args, **kwargs)
try:
loop = asyncio.get_running_loop()
task = asyncio.create_task(async_func(*args, **kwargs))
signal.signal(signal.SIGINT, _AsyncTaskSigIntHandler(task, loop).on_sigint)
return await task
finally:
signal.signal(signal.SIGINT, signal.default_int_handler)


def _should_update_sigint():
return (
threading.current_thread() is threading.main_thread()
and signal.getsignal(signal.SIGINT) is signal.default_int_handler
)


def async_run_allowing_running_loop(async_func, *args, **kwargs):
"""Run an async function in a new thread, allowing the current thread to have a running event loop.
Expand All @@ -36,7 +92,7 @@ def async_run_allowing_running_loop(async_func, *args, **kwargs):
with ThreadPoolExecutorWithContext() as executor:
return executor.submit(lambda: asyncio.run(async_func(*args, **kwargs))).result()
else:
return asyncio.run(async_func(*args, **kwargs))
return asyncio.run(_invoke_async_with_sigint_handler(async_func, *args, **kwargs))


def async_to_sync(func):
Expand Down
2 changes: 1 addition & 1 deletion src/promptflow-core/promptflow/_utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def block_terminate_signal_to_parent():
signal.set_wakeup_fd(-1)

signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.default_int_handler)


def get_available_max_worker_count(logger: logging.Logger = bulk_logger):
Expand Down
37 changes: 15 additions & 22 deletions src/promptflow-core/promptflow/executor/_async_nodes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import contextvars
import inspect
import os
import signal
import threading
import time
import traceback
Expand Down Expand Up @@ -46,23 +45,18 @@ async def execute(
inputs: Dict[str, Any],
context: FlowExecutionContext,
) -> Tuple[dict, dict]:
# TODO: Provide cancel API
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
else:
flow_logger.info(
"Current thread is not main thread, skip signal handler registration in AsyncNodesScheduler."
)

# Semaphore should be created in the loop, otherwise it will not work.
loop = asyncio.get_running_loop()
self._semaphore = asyncio.Semaphore(self._node_concurrency)
if (interval := try_get_long_running_logging_interval(flow_logger, DEFAULT_TASK_LOGGING_INTERVAL)) is not None:
monitor = ThreadWithContextVars(
target=monitor_long_running_coroutine,
args=(
interval, loop, self._task_start_time, self._task_last_log_time, self._dag_manager_completed_event
interval,
loop,
self._task_start_time,
self._task_last_log_time,
self._dag_manager_completed_event,
),
daemon=True,
)
Expand All @@ -80,7 +74,11 @@ async def execute(
# This is because it will always call `executor.shutdown()` when exiting the `with` block.
# Then the event loop will wait for all tasks to be completed before raising the cancellation error.
# See reference: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor
outputs = await self._execute_with_thread_pool(executor, nodes, inputs, context)
try:
outputs = await self._execute_with_thread_pool(executor, nodes, inputs, context)
except asyncio.CancelledError:
await self.cancel()
raise
executor.shutdown()
return outputs

Expand Down Expand Up @@ -171,16 +169,11 @@ async def _sync_function_to_async_task(
# The task will not be executed before calling create_task.
return await asyncio.get_running_loop().run_in_executor(executor, context.invoke_tool, node, f, kwargs)


def signal_handler(sig, frame):
"""
Start a thread to monitor coroutines after receiving signal.
"""
flow_logger.info(f"Received signal {sig}({signal.Signals(sig).name}), start coroutine monitor thread.")
loop = asyncio.get_running_loop()
monitor = ThreadWithContextVars(target=monitor_coroutine_after_cancellation, args=(loop,))
monitor.start()
raise KeyboardInterrupt
async def cancel(self):
flow_logger.info("Cancel requested, monitoring coroutines after cancellation.")
loop = asyncio.get_running_loop()
monitor = ThreadWithContextVars(target=monitor_coroutine_after_cancellation, args=(loop,))
monitor.start()


def log_stack_recursively(task: asyncio.Task, elapse_time: float):
Expand Down
11 changes: 4 additions & 7 deletions src/promptflow-core/promptflow/executor/flow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,15 +1098,12 @@ async def _exec_async(
context,
allow_generator_output,
)
except KeyboardInterrupt as ex:
# Run will be cancelled when the process receives a SIGINT signal.
# KeyboardInterrupt will be raised after asyncio finishes its signal handling
# End run with the KeyboardInterrupt exception, so that its status will be Canceled
flow_logger.info("Received KeyboardInterrupt, cancel the run.")
# Update the run info of those running nodes to a canceled status.
except asyncio.CancelledError as ex:
flow_logger.info("Received cancelled error, cancel the run.")
run_tracker.cancel_node_runs(run_id)
run_tracker.end_run(line_run_id, ex=ex)
raise
if self._raise_ex:
raise
except Exception as e:
run_tracker.end_run(line_run_id, ex=e)
if self._raise_ex:
Expand Down

0 comments on commit b3257cc

Please sign in to comment.