diff --git a/capnp/helpers/capabilityHelper.h b/capnp/helpers/capabilityHelper.h index 3aabbba..7d614e4 100644 --- a/capnp/helpers/capabilityHelper.h +++ b/capnp/helpers/capabilityHelper.h @@ -75,10 +75,19 @@ class PythonInterfaceDynamicImpl final: public capnp::DynamicCapability::Server kj::Own py_server; kj::Own kj_loop; +#if (CAPNP_VERSION_MAJOR < 1) PythonInterfaceDynamicImpl(capnp::InterfaceSchema & schema, kj::Own _py_server, kj::Own kj_loop) - : capnp::DynamicCapability::Server(schema), py_server(kj::mv(_py_server)), kj_loop(kj::mv(kj_loop)) { } + : capnp::DynamicCapability::Server(schema), + py_server(kj::mv(_py_server)), kj_loop(kj::mv(kj_loop)) { } +#else + PythonInterfaceDynamicImpl(capnp::InterfaceSchema & schema, + kj::Own _py_server, + kj::Own kj_loop) + : capnp::DynamicCapability::Server(schema, { true }), + py_server(kj::mv(_py_server)), kj_loop(kj::mv(kj_loop)) { } +#endif ~PythonInterfaceDynamicImpl() { } @@ -87,6 +96,12 @@ class PythonInterfaceDynamicImpl final: public capnp::DynamicCapability::Server capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> context); }; +inline void allowCancellation(capnp::CallContext context) { +#if (CAPNP_VERSION_MAJOR < 1) + context.allowCancellation(); +#endif +} + class PyAsyncIoStream: public kj::AsyncIoStream { public: kj::Own protocol; diff --git a/capnp/helpers/helpers.pxd b/capnp/helpers/helpers.pxd index dde61f0..4fce7f6 100644 --- a/capnp/helpers/helpers.pxd +++ b/capnp/helpers/helpers.pxd @@ -1,7 +1,7 @@ from capnp.includes.capnp_cpp cimport ( Maybe, PyPromise, VoidPromise, RemotePromise, - DynamicCapability, InterfaceSchema, EnumSchema, StructSchema, DynamicValue, Capability, - RpcSystem, MessageBuilder, Own, PyRefCounter, Node, DynamicStruct + DynamicCapability, InterfaceSchema, EnumSchema, StructSchema, DynamicValue, Capability, + RpcSystem, MessageBuilder, Own, PyRefCounter, Node, DynamicStruct, CallContext ) from capnp.includes.schema_cpp cimport ByteArray @@ -22,6 +22,7 @@ cdef extern from "capnp/helpers/capabilityHelper.h": PyPromise convert_to_pypromise(RemotePromise) PyPromise convert_to_pypromise(VoidPromise) VoidPromise taskToPromise(Own[PyRefCounter] coroutine, PyObject* callback) + void allowCancellation(CallContext context) except +reraise_kj_exception nogil void init_capnp_api() cdef extern from "capnp/helpers/rpcHelper.h": diff --git a/capnp/lib/capnp.pyx b/capnp/lib/capnp.pyx index e824b2e..2f8c0bb 100644 --- a/capnp/lib/capnp.pyx +++ b/capnp/lib/capnp.pyx @@ -1901,7 +1901,7 @@ async def kj_loop(): async def run(coro): """Ensure that the coroutine runs while the KJ event loop is running - + This is a shortcut for wrapping the coroutine in a :py:meth:`capnp.kj_loop` context manager. :param coro: Coroutine to run @@ -1923,6 +1923,7 @@ cdef class _CallContext: cdef CallContext * thisptr cdef _init(self, CallContext other): + helpers.allowCancellation(other) self.thisptr = new CallContext(move(other)) return self diff --git a/test/test_capability.py b/test/test_capability.py index da732b2..243c23f 100644 --- a/test/test_capability.py +++ b/test/test_capability.py @@ -1,4 +1,5 @@ import pytest +import asyncio import capnp import test_capability_capnp as capability @@ -377,3 +378,22 @@ async def test_generic(): obj = capnp._MallocMessageBuilder().get_root_as_any() obj.set_as_text("anypointer_") assert (await client.foo(obj)).b == "anypointer_test" + + +class CancelServer(capability.TestInterface.Server): + def __init__(self, val=1): + self.val = val + + async def foo(self, i, j, **kwargs): + with pytest.raises(asyncio.CancelledError): + await asyncio.sleep(10) + + +async def test_cancel2(): + client = capability.TestInterface._new_client(CancelServer()) + + task = asyncio.ensure_future(client.foo(1, True)) + await asyncio.sleep(0) # Make sure that the task runs + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task