Skip to content

Commit

Permalink
Merge pull request #3540 from wyfo/coroutine
Browse files Browse the repository at this point in the history
feat: support `async fn` in macros with coroutine implementation
  • Loading branch information
davidhewitt authored Nov 22, 2023
2 parents 3f0dfa9 + 627841f commit 69870d2
Show file tree
Hide file tree
Showing 20 changed files with 474 additions and 61 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ unindent = { version = "0.2.1", optional = true }
# support crate for multiple-pymethods feature
inventory = { version = "0.3.0", optional = true }

# coroutine implementation
futures-util = "0.3"

# crate integrations that can be added using the eponymous features
anyhow = { version = "1.0", optional = true }
chrono = { version = "0.4.25", default-features = false, optional = true }
Expand All @@ -54,6 +57,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.61"
rayon = "1.6.1"
widestring = "0.5.1"
futures = "0.3.28"

[build-dependencies]
pyo3-build-config = { path = "pyo3-build-config", version = "0.21.0-dev", features = ["resolve-config"] }
Expand Down
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- [Conversion traits](conversions/traits.md)]
- [Python exceptions](exception.md)
- [Calling Python from Rust](python_from_rust.md)
- [Using `async` and `await`](async-await.md)
- [GIL, mutability and object types](types.md)
- [Parallelism](parallelism.md)
- [Debugging](debugging.md)
Expand Down
78 changes: 78 additions & 0 deletions guide/src/async-await.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Using `async` and `await`

*This feature is still in active development. See [the related issue](https://github.com/PyO3/pyo3/issues/1632).*

`#[pyfunction]` and `#[pymethods]` attributes also support `async fn`.

```rust
# #![allow(dead_code)]
use std::{thread, time::Duration};
use futures::channel::oneshot;
use pyo3::prelude::*;

#[pyfunction]
async fn sleep(seconds: f64, result: Option<PyObject>) -> Option<PyObject> {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_secs_f64(seconds));
tx.send(()).unwrap();
});
rx.await.unwrap();
result
}
```

*Python awaitables instantiated with this method can only be awaited in *asyncio* context. Other Python async runtime may be supported in the future.*

## `Send + 'static` constraint

Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object.

As a consequence, `async fn` parameters and return types must also be `Send + 'static`, so it is not possible to have a signature like `async fn does_not_compile(arg: &PyAny, py: Python<'_>) -> &PyAny`.

It also means that methods cannot use `&self`/`&mut self`, *but this restriction should be dropped in the future.*


## Implicit GIL holding

Even if it is not possible to pass a `py: Python<'_>` parameter to `async fn`, the GIL is still held during the execution of the future – it's also the case for regular `fn` without `Python<'_>`/`&PyAny` parameter, yet the GIL is held.

It is still possible to get a `Python` marker using [`Python::with_gil`]({{#PYO3_DOCS_URL}}/pyo3/struct.Python.html#method.with_gil); because `with_gil` is reentrant and optimized, the cost will be negligible.

## Release the GIL across `.await`

There is currently no simple way to release the GIL when awaiting a future, *but solutions are currently in development*.

Here is the advised workaround for now:

```rust,ignore
use std::{future::Future, pin::{Pin, pin}, task::{Context, Poll}};
use pyo3::prelude::*;
struct AllowThreads<F>(F);
impl<F> Future for AllowThreads<F>
where
F: Future + Unpin + Send,
F::Output: Send,
{
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let waker = cx.waker();
Python::with_gil(|gil| {
gil.allow_threads(|| pin!(&mut self.0).poll(&mut Context::from_waker(waker)))
})
}
}
```

## Cancellation

*To be implemented*

## The `Coroutine` type

To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine). Each `coroutine.send` call is translated to `Future::poll` call, while `coroutine.throw` call reraise the exception *(this behavior will be configurable with cancellation support)*.

*The type does not yet have a public constructor until the design is finalized.*
2 changes: 2 additions & 0 deletions guide/src/ecosystem/async-await.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Using `async` and `await`

*`async`/`await` support is currently being integrated in PyO3. See the [dedicated documentation](../async-await.md)*

If you are working with a Python library that makes use of async functions or wish to provide
Python bindings for an async Rust library, [`pyo3-asyncio`](https://github.com/awestlake87/pyo3-asyncio)
likely has the tools you need. It provides conversions between async functions in both Python and
Expand Down
1 change: 1 addition & 0 deletions newsfragments/3540.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support `async fn` in macros with coroutine implementation
8 changes: 7 additions & 1 deletion pyo3-macros-backend/src/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ pub struct FnSpec<'a> {
pub output: syn::Type,
pub convention: CallingConvention,
pub text_signature: Option<TextSignatureAttribute>,
pub asyncness: Option<syn::Token![async]>,
pub unsafety: Option<syn::Token![unsafe]>,
pub deprecations: Deprecations,
}
Expand Down Expand Up @@ -319,6 +320,7 @@ impl<'a> FnSpec<'a> {
signature,
output: ty,
text_signature,
asyncness: sig.asyncness,
unsafety: sig.unsafety,
deprecations,
})
Expand Down Expand Up @@ -447,7 +449,11 @@ impl<'a> FnSpec<'a> {
let func_name = &self.name;

let rust_call = |args: Vec<TokenStream>| {
quotes::map_result_into_ptr(quotes::ok_wrap(quote! { function(#self_arg #(#args),*) }))
let mut call = quote! { function(#self_arg #(#args),*) };
if self.asyncness.is_some() {
call = quote! { _pyo3::impl_::coroutine::wrap_future(#call) };
}
quotes::map_result_into_ptr(quotes::ok_wrap(call))
};

let rust_name = if let Some(cls) = cls {
Expand Down
5 changes: 2 additions & 3 deletions pyo3-macros-backend/src/pyfunction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
deprecations::Deprecations,
method::{self, CallingConvention, FnArg},
pymethod::check_generic,
utils::{ensure_not_async_fn, get_pyo3_crate},
utils::get_pyo3_crate,
};
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
Expand Down Expand Up @@ -179,8 +179,6 @@ pub fn impl_wrap_pyfunction(
options: PyFunctionOptions,
) -> syn::Result<TokenStream> {
check_generic(&func.sig)?;
ensure_not_async_fn(&func.sig)?;

let PyFunctionOptions {
pass_module,
name,
Expand Down Expand Up @@ -231,6 +229,7 @@ pub fn impl_wrap_pyfunction(
signature,
output: ty,
text_signature,
asyncness: func.sig.asyncness,
unsafety: func.sig.unsafety,
deprecations: Deprecations::new(),
};
Expand Down
3 changes: 1 addition & 2 deletions pyo3-macros-backend/src/pymethod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::borrow::Cow;

use crate::attributes::{NameAttribute, RenamingRule};
use crate::method::{CallingConvention, ExtractErrorMode};
use crate::utils::{ensure_not_async_fn, PythonDoc};
use crate::utils::PythonDoc;
use crate::{
method::{FnArg, FnSpec, FnType, SelfType},
pyfunction::PyFunctionOptions,
Expand Down Expand Up @@ -188,7 +188,6 @@ pub fn gen_py_method(
options: PyFunctionOptions,
) -> Result<GeneratedPyMethod> {
check_generic(sig)?;
ensure_not_async_fn(sig)?;
ensure_function_options_valid(&options)?;
let method = PyMethod::parse(sig, meth_attrs, options)?;
let spec = &method.spec;
Expand Down
13 changes: 1 addition & 12 deletions pyo3-macros-backend/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use proc_macro2::{Span, TokenStream};
use quote::ToTokens;
use syn::{punctuated::Punctuated, spanned::Spanned, Token};
use syn::{punctuated::Punctuated, Token};

use crate::attributes::{CrateAttribute, RenamingRule};

Expand Down Expand Up @@ -137,17 +137,6 @@ impl quote::ToTokens for PythonDoc {
}
}

pub fn ensure_not_async_fn(sig: &syn::Signature) -> syn::Result<()> {
if let Some(asyncness) = &sig.asyncness {
bail_spanned!(
asyncness.span() => "`async fn` is not yet supported for Python functions.\n\n\
Additional crates such as `pyo3-asyncio` can be used to integrate async Rust and \
Python. For more information, see https://github.com/PyO3/pyo3/issues/1632"
);
};
Ok(())
}

pub fn unwrap_ty_group(mut ty: &syn::Type) -> &syn::Type {
while let syn::Type::Group(g) = ty {
ty = &*g.elem;
Expand Down
137 changes: 137 additions & 0 deletions src/coroutine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//! Python coroutine implementation, used notably when wrapping `async fn`
//! with `#[pyfunction]`/`#[pymethods]`.
use std::{
any::Any,
future::Future,
panic,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use futures_util::FutureExt;
use pyo3_macros::{pyclass, pymethods};

use crate::{
coroutine::waker::AsyncioWaker,
exceptions::{PyRuntimeError, PyStopIteration},
panic::PanicException,
pyclass::IterNextOutput,
types::PyIterator,
IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python,
};

mod waker;

const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine";

type FutureOutput = Result<PyResult<PyObject>, Box<dyn Any + Send>>;

/// Python coroutine wrapping a [`Future`].
#[pyclass(crate = "crate")]
pub struct Coroutine {
future: Option<Pin<Box<dyn Future<Output = FutureOutput> + Send>>>,
waker: Option<Arc<AsyncioWaker>>,
}

impl Coroutine {
/// Wrap a future into a Python coroutine.
///
/// Coroutine `send` polls the wrapped future, ignoring the value passed
/// (should always be `None` anyway).
///
/// `Coroutine `throw` drop the wrapped future and reraise the exception passed
pub(crate) fn from_future<F, T, E>(future: F) -> Self
where
F: Future<Output = Result<T, E>> + Send + 'static,
T: IntoPy<PyObject>,
PyErr: From<E>,
{
let wrap = async move {
let obj = future.await?;
// SAFETY: GIL is acquired when future is polled (see `Coroutine::poll`)
Ok(obj.into_py(unsafe { Python::assume_gil_acquired() }))
};
Self {
future: Some(Box::pin(panic::AssertUnwindSafe(wrap).catch_unwind())),
waker: None,
}
}

fn poll(
&mut self,
py: Python<'_>,
throw: Option<PyObject>,
) -> PyResult<IterNextOutput<PyObject, PyObject>> {
// raise if the coroutine has already been run to completion
let future_rs = match self.future {
Some(ref mut fut) => fut,
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)),
};
// reraise thrown exception it
if let Some(exc) = throw {
self.close();
return Err(PyErr::from_value(exc.as_ref(py)));
}
// create a new waker, or try to reset it in place
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) {
waker.reset();
} else {
self.waker = Some(Arc::new(AsyncioWaker::new()));
}
let waker = futures_util::task::waker(self.waker.clone().unwrap());
// poll the Rust future and forward its results if ready
if let Poll::Ready(res) = future_rs.as_mut().poll(&mut Context::from_waker(&waker)) {
self.close();
return match res {
Ok(res) => Ok(IterNextOutput::Return(res?)),
Err(err) => Err(PanicException::from_panic_payload(err)),
};
}
// otherwise, initialize the waker `asyncio.Future`
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? {
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__`
// and will yield itself if its result has not been set in polling above
if let Some(future) = PyIterator::from_object(future).unwrap().next() {
// future has not been leaked into Python for now, and Rust code can only call
// `set_result(None)` in `ArcWake` implementation, so it's safe to unwrap
return Ok(IterNextOutput::Yield(future.unwrap().into()));
}
}
// if waker has been waken during future polling, this is roughly equivalent to
// `await asyncio.sleep(0)`, so just yield `None`.
Ok(IterNextOutput::Yield(py.None().into()))
}
}

pub(crate) fn iter_result(result: IterNextOutput<PyObject, PyObject>) -> PyResult<PyObject> {
match result {
IterNextOutput::Yield(ob) => Ok(ob),
IterNextOutput::Return(ob) => Err(PyStopIteration::new_err(ob)),
}
}

#[pymethods(crate = "crate")]
impl Coroutine {
fn send(&mut self, py: Python<'_>, _value: &PyAny) -> PyResult<PyObject> {
iter_result(self.poll(py, None)?)
}

fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> {
iter_result(self.poll(py, Some(exc))?)
}

fn close(&mut self) {
// the Rust future is dropped, and the field set to `None`
// to indicate the coroutine has been run to completion
drop(self.future.take());
}

fn __await__(self_: Py<Self>) -> Py<Self> {
self_
}

fn __next__(&mut self, py: Python<'_>) -> PyResult<IterNextOutput<PyObject, PyObject>> {
self.poll(py, None)
}
}
Loading

0 comments on commit 69870d2

Please sign in to comment.