Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "one obvious way" for implementing the common multiplexed request/response pattern #467

Open
njsmith opened this issue Mar 14, 2018 · 30 comments

Comments

@njsmith
Copy link
Member

njsmith commented Mar 14, 2018

Here's a very common pattern: we have a connection to some kind of peer. Multiple tasks send requests via the connection, and wait for responses. One background task reads from the connection, and dispatches the responses back to the appropriate requesting tasks.

Examples include WAMP (I think @agronholm ran into this), @sscherfke's aiomas channels, this SO question from @lilydjwg, gRPC, any HTTP client library that wants to support HTTP/2, etc.

It's certainly possible to do this kind of thing from Trio, but there are enough subtleties and it's a common enough pattern that I think it's worth working thinking it through once and working out the best solution, and then adding helpers to trio so it becomes The One Obvious Way To Do It. Also the docs should have an explicit tutorial about this. (Or maybe a tutorial will turn out to be sufficient.)

Some issues to consider:

I used to recommend that people directly use wait_task_rescheduled inside the await rpc_call(...) function, since conceptually it seems like exactly the right thing: it puts a task to sleep, waits for a single value/error to come back, and nudges the user to think about cancellation. BUT, in realistic cases this can create a race condition: after we send the request, it's theoretically possible for the response to come back before we can call wait_task_rescheduled, and then everything falls apart. The underlying problem here is that wait_task_rescheduled uses the task object itself as the key to wake it up again, and since tasks sleep many times over their lifetime, this creates an ambiguity where we can't tell which wakeup goes with with sleep unless we make sure that they're always in lockstep. So I guess one potential option would be to change the low-level sleep/wakeup APIs use a different key, one that's unique for each sleep? There's nothing terribly magic about the task object, but it does have the advantage that it's an existing object that we have to stash in thread-local storage anyway, whereas creating a unique key for each time probably adds a tiny bit more overhead. OTOH, maybe it doesn't matter...?

We don't really want something like Result.capture here, because we don't want to catch random exceptions that happen inside the reader task (especially things like Cancelled!!); we only want to create values/errors based on the data read from the remote peer. OTOH maybe taking a Result is fine and we just trust people to use the Error/Value constructors directly. Or since Result is in hazmat, maybe we want to (also) provide direct set_value/set_error methods so people don't have to know about it just to use this. [Edit: some of this doesn't apply anymore now that we've split off Result into its own library (renaming it to Outcome in the process).]

The "obvious" answer is a Future. But this isn't actually particularly closely matched to this use case: for example, a Future is multi-consumer, while in this case we know there's only one consumer. A Future is effectively an Event+Result; in some sense what we want here is something more like a single-use Queue+Result. Where this different particularly matters is cancellation: when we know there's only a single consumer, and it gets cancelled, that's different than if one of potentially multiple consumers gets cancelled.

Maybe we'll end up with a Future at the end of this after all, but I want to explore the larger space first, especially since we've all been brainwashed into seeing Futures as the solution to everything and it's sometimes hard to tell whether when it's actually true and when it's when-all-you-have-is-a-hammer syndrome. Also, maybe Futures are just the wrong level of abstraction entirely. Maybe we should be thinking of a piece of reusable code to capture this pattern and take care of all the plumbing, sort of like serve_listeners encapsulates a common pattern.

In general, cancellation is a big issue here.

In fact, cancellation is a problem even before we worry about handling the response. The most straightforward way to implement a generic RPC mechanism would be to have an API like await conn.run_remote(fn, *args), and then run_remote encodes the request, sends it on the underlying stream, and then reads the response.

On the requesting side, if you just do the straightforward thing like:

async def run_remote(self, fn, *args):
    async with self._send_lock:
        request_id = next(self._request_counter)
        request_data = self._encode_request(request_id, fn, *args)
        await self._stream.send_all(request_data)
    return await self._somehow_get_response(request_id)

Then you're already in trouble with respect to cancellation: if you get cancelled in the middle of send_all, then you might have sent only half your request, and now instead of cancelling a single request on this connection, you've just corrupted the connection entirely. That's no good at all.

One solution would be to use shielding, though it's still tricky: you do want the send_all to be cancellable in some cases (e.g. if the user hits control-C). Just not if the only thing being cancelled is this particular request. I guess you could use shielding, + make the receive loop close the stream if its cancelled, and then #460 will kick in and cause send_all to exit early? Maybe you want all those other pieces anyway?

When the receive loop is cancelled, you also want to immediately resolve all outstanding requests with some kind of ConnectionLost error.

Alternatively, we could add a second background task, for our send loop; by assumption, we already have a background task for our receive loop, so adding a second task doesn't change our user-visible API. Then our request function could be implemented as something like:

async def run_remote(fn, *args):
    request_id = next(self._request_counter)
    request_data = self._encode_request(request_id, fn, *args)
    await self._send_queue.put(request_data)
    return await self._somehow_get_response(request_id)

This nicely removes the lock and the original cancellation hazard: the only things Queue.put can do are return normally or raise an error, and in the former case it's always succeeded and in the latter case it's always a no-op.

However, this particular formulation assumes that our protocol is composed of totally independent messages, where for each message we can assign it an id, encode it, and then drop or re-order it, independently of all the others. For many protocols, that's not true -- maybe ids are constrained to be sequential, or the messages contain a sequence number, so you need to make sure that the calls to encode happen in the right order, and once you've encoded the message then you're committed to sending it. In these cases you'd want to put the encoding logic inside the sender task / inside the shield. And in the sender task case, this means you either need some way to get the request_id back from the sender task, or else the run_remote function needs to create an object representing the response and hand it off to the sender task, like:

async def run_remote(fn, *args):
    response = OneShotFuture()
    await self._send_queue.put((response, fn, args))
    return response.get()  # Still todo: cancellation handling here

OK. So we've actually sent our request. Now what happens if our request function gets cancelled? There are a few different semantics that might make sense.

If the operation is side-effect free, we might want to abandon it: let run_remote return immediately (raising Cancelled), and when/if a response does come back, throw it away. Alternatively, we might want to follow Trio's normal rules for cancellation, and wait for the response to come back regardless. (Since our normal rule is that cancelled means that the operation was actually cancelled and didn't happen, or at least didn't happen completely.)

Then there's a second, orthogonal question: do we somehow notify the remote peer of the cancellation? For example, WAMP's RPC system has a "cancel that RPC request" message, and if our remote peer is also using trio then it could straightforwardly convert this back into a trio cancellation on the remote side. But there are other protocols don't have this feature.

I say these are orthogonal because they form a 2x2 square of plausible behaviors: you could abandon + don't notify, you could abandon + send a cancel request (as a courtesy -- you'll throw the response away in any case, but maybe the other side can save some processing), you could wait + don't notify (sorry, you're just stuck waiting, similar to run_sync_in_worker_thread), or you could wait + notify. In the last two cases, if a peer is totally locked-up and not responding, then you can still escape by closing the connection entirely / cancelling the nursery that's holding the receive task, but timeouts on individual calls will be ineffective.

This is all very tricky: you want timeouts to be reliably enforced without being at the whims of a remote peer, and you also want to know whether a cancelled call actually happened or not, and in a distributed system these are incompatible – when everything is working normally you can have both, but after a partition you have to accept that some requests are in an indeterminate state. So at some level we just have to expose this fundamental reality rather than try to fix it, but it's tricky to do. Maybe this should be configurable on individual calls? Maybe you need two timeouts, one for the "send a cancel" behavior and one for the "give up entirely" behavior? Maybe the "give up entirely" is more of a per-connection feature than a per-request feature, so you might want per-call soft cancellation, and then an underlying heartbeat timer at the connection level where if there's no data for X seconds then the connection closes? I notice I've started talking about these as "hard" and "soft" cancellation – maybe there's some connection to #147? And should there be a different error for "successful cancel, confirmed by the remote peer" versus "couldn't talk to the remote peer, I have no idea what happened"?

We can implement all these different options, but it may be more or less awkward depending on what overall architecture we use.

Yet another complication: Streaming, as in HTTP/2 downloads or gRPC's streaming RPC features: maybe this isn't really different and can be essentially modeled as a request/response, b/c handling backpressure safety means that streams have to wait until the receiver opens its window (i.e., requests more data)? Of course in many cases you'll want some buffering, which basically means sending off the next request optimistically before we actually need it.

Question: how do the implementation patterns for queue protocols like AMQP or Kafka compare to this multiplexed request/response pattern?

@smurfix
Copy link
Contributor

smurfix commented Mar 14, 2018

AMQP has some multi-part messages which must be transmitted back-to-back (per channel; a single TCP connection carries multiple channels). trio-amqp uses a per-channel send lock to enforce that. Waiting for the reply is useless when you're cancelled and there's no cancel-this-request message.

However, this only shifts the whole problem to the next-higher layer, i.e. the code that ends up doing request/response across AMQP (not an uncommon pattern).
AMQP doesn't report that a client has died (though one might be able to trick it into doing that, with some effort) so cancel messages are uncommon because they'd only fix part of the problem.

@sscherfke
Copy link

sscherfke commented Mar 19, 2018

As I already said in the chat, I would strongly favor Event being used for this, because it would add relatively few stuff to the API that users need to learn and from my experience, event would be the place were I would look for this kind of functionality.

Single-consumer vs. multi-consumer

Is this realy important? If you have something like this:

async def send(self, msg):
    msg_id = next(self._msg_counter)
    waiter = Event()
    self._waiters[msg_id] = waiter
    data = self._encode_request(msg_id, msg)
    await self._stream.send_all(msg)
    return waiter.wait()

Then the instance of send() is practically the only consumer of waiter because it is not very easy for other tasks to get the waiter instance and also wait() on it.

Cancellation

I don’t like the idea of using callbacks to do some cleanup after a cancellation, b/c the whole idea of trio is to ged rid of callbacks. I have also no idea how we would send a cancellation message to the peer from a sync. callback:

async def send(self, msg):
    msg_id = next(self._msg_counter)
    waiter = Event()
    self._waiters[msg_id] = Event()
    data = self._encode_request(msg_id, msg)
    await self._stream.send_all(msg)

    def abort_fn():
        del self._waiters[msg_id]
        # How to send sth. to the peer?
        return trio.hazmat.Abort.SUCCESS

    return waiter.wait(abort_fn=abort_fn)

From my POV, using try ... catch ... would be a better user experience:

async def send(self, msg):
    msg_id = next(self._msg_counter)
    waiter = Event()
    self._waiters[msg_id] = Event()
    data = self._encode_request(msg_id, msg)
    try:
        await self._stream.send_all(msg)
        return waiter.wait()
    except trio.Cancelled as cancel:
        del self._waiters[msg_id]
        try:
            await self._stream.send_all(bf'Cancel {msg_id}!!!!')
            cancel.succeeded()
        except:  # This is kind of ugly …
            cancel.failed()

The problem with this approach is that the user can do anything in the catch block so that the nursery might have to wait until the task is actually cancelled.

@njsmith
Copy link
Member Author

njsmith commented Mar 20, 2018

We should also give some advice on good conventions for how to expose the setting-up-a-background-task part in your public API.

Maybe, supporting:

async with open_YOUR_TYPE(...) as connection:
    ...

as a convenient version that automatically opens its own nursery, and then also providing a lower-level API for those who need to e.g. open tons of connections and don't want to recursively nest tons of async withs:

conn = await YOUR_TYPE.connect(..., nursery=nursery)
...
await conn.aclose()

I'm actually not sure how useful the latter case really is, since usually if you're juggling a bunch of connections you're also creating tasks for them anyway, and each task containing a single async with is not a big deal?

@sscherfke
Copy link

The latter case is quite useful if you want to create a large number of connections and want them to share the same nursery for performance reasons.

@njsmith
Copy link
Member Author

njsmith commented Mar 20, 2018

I guess an example use case for the explicit nursery would be a fan-out proxy, where a task reads from one connection and then sends on 1,000 connections.

@sscherfke
Copy link

Or a multi-agent system. :)

@smurfix
Copy link
Contributor

smurfix commented Apr 10, 2018

Yes I want a send task. Two reasons, (a) I can just use with move_on_after() in the loop to do a simple keepalive, (b) when a request gets cancelled I can push some sort of CancelRequest object to the send queue and let the send task handle the problem.

IMHO the cancelled task is … well, cancelled. It's finished. Its job is no longer to wait on anything. We don't have separate "soft" and "hard" timeouts and IMHO when we think we need them we should instead refactor the problem. In this case, we should encapsulate whatever clean-up action needs to be taken in that CancelRequest (let's make it an instance of a subclass of AbstractCancelRequest) and have the receiver call a method of that when either a cancel confirmation or the actual result arrives – or a hard receiver timeout occurs.

While this idea does smell like one of those evil callback things which Trio works quite hard to avoid, in this case I'd pay the price because it actually makes reasoning about the problem easier: it allows us to have a simple pattern which works for the easy fire-and-forget case as well as for requests that require ACIDish semantics.

@njsmith
Copy link
Member Author

njsmith commented Apr 10, 2018

IMHO the cancelled task is … well, cancelled. It's finished. Its job is no longer to wait on anything

In general, Trio's principle is that once a function returns, then it's done, it's not running anymore. (With the narrow exception of functions where you explicitly pass in a nursery.) This is arguably the most foundational of Trio's design principles.

Following this principle, when you cancel any of trio's built-in operations, then they wait for the cancellation to complete before raising Cancelled. Cancelled means it's actually cancelled, not scheduled to be cancelled at some future point. In most cases that arise inside trio itself, this is pretty straightforward, because cancellation can be done synchronously. But this is why run_sync_in_worker_thread ignores cancellation by default (unless you specifically tell it that the operation is OK to abandon, which is a hack to make getaddrinfo work the way you expect – since getaddrinfo is totally side-effect-free it's impossible to tell whether it's still running after being cancelled). And if we use IOCP on Windows, then operations like socket.send can have asynchronous cancellation (you have to submit a request to the kernel to cancel the operation, and then later you find out whether it was actually cancelled); we would definitely not want to request the cancellation and then immediately raise Cancelled. (The desire to support stuff like this is part of why wait_task_rescheduled has such a complicated calling convention, with the argument that's a callback that takes an argument that's a callback.)

I can imagine that there are specific circumstances where people are using RPC systems in ways where they don't want these semantics, or want to relax them like we do for getaddrinfo. But we should certainly take the standard wait-for-operations-to-resolve-before-returning semantics into account when coming up with general recommendations.

@smurfix
Copy link
Contributor

smurfix commented Apr 17, 2018

OK … so instead of an explicit callback, we use a class that represents an RPCish request and which has specific methods for the client to cancel things, and for the connection to feed back a result / cancel status / error / progress report(?). Whether the client waits for that result after canceling it, or indeed at all, would then be up to the client.

I should probably write a couple of ABCs for that (and use them in my AMQP package).

@takluyver
Copy link
Contributor

Subscribing with an interest in two more protocols that work this way: Jupyter messaging, with request/reply on ZMQ sockets, and DBus, where request/reply on a Unix socket is mixed with some one-way messages.

@sscherfke
Copy link

I have just read this discussion again and now I’m not sure what the following diff (add an outcome.Outcome to Event) changes anything about cancellation. You can continue to use Event as you did before but if you want, you can pass a value or error to it and consume it once:

diff --git a/trio/_sync.py b/trio/_sync.py
index fe806dd..4bce812 100644
--- a/trio/_sync.py
+++ b/trio/_sync.py
@@ -30,6 +30,7 @@ class Event:

     _lot = attr.ib(default=attr.Factory(_core.ParkingLot), init=False)
     _flag = attr.ib(default=False, init=False)
+    _outcome = attr.ib(default=None, init=False)

     def is_set(self):
         """Return the current value of the internal flag.
@@ -44,6 +45,19 @@ class Event:
         """
         self._flag = True
         self._lot.unpark_all()
+        self._outcome = outcome.Value(None)
+
+    @_core.enable_ki_protection
+    def set_value(self, value):
+        self._flag = True
+        self._lot.unpark_all()
+        self._outcome = outcome.Value(value)
+
+    @_core.enable_ki_protection
+    def set_error(self, exception):
+        self._flag = True
+        self._lot.unpark_all()
+        self._outcome = outcome.Error(exception)

     def clear(self):
         """Set the internal flag value to False.
@@ -62,6 +76,7 @@ class Event:
             await _core.checkpoint()
         else:
             await self._lot.park()
+        return self._outcome.unwrap()

     def statistics(self):
         """Return an object containing debugging information.

@belm0
Copy link
Member

belm0 commented Aug 12, 2018

Citing an example multiplexer by @nonsleepr. From my limited understanding this falls in the wait_task_rescheduled camp.

https://gist.github.com/nonsleepr/43420a0ce659f1b870544fffb9e5cda4

@decentral1se
Copy link
Member

decentral1se commented Jan 17, 2020

I won't pretend to have grokked all points raised here but focussing on (see #467 (comment)):

our protocol is composed of totally independent messages, where for each message we can assign it an id, encode it, and then drop or re-order it, independently of all the others.

I tried to implement an example in a similar vein. Perhaps it is useful to see, I certainly struggled to get here and it needs more work! I worked on this because 1) I need this pattern in the code im playing with but 2) I wasn't able to find any other example code of people doing mux/demux work with Trio (I see there are other examples mentioned above but I couldn't dig them out). Here it is: https://gist.github.com/decentral1se/42adc1f80f17655be6eb1f4a73ad7f0b.

I'm definitely curious to see what Trio can offer in terms of helpers for this pattern.

@takluyver
Copy link
Contributor

I've drafted out a rough implementation for D-Bus, and I'd be interested in feedback. It's entirely possible that there are massive bugs that haven't occurred to me.
https://gitlab.com/takluyver/jeepney/blob/trio/jeepney/integrate/trio.py

  • I've kind of used memory channels as futures: each request opens a new channel and puts the sending end in a dict, then waits on the receiving end. This feels a bit clumsy, but I was trying to see if I could avoid touching hazmat.
  • D-Bus doesn't enforce a request-reply pattern, so there can be other messages mixed with the expected replies. I've tried to make a sensible compromise: the receiver won't consume data when no-one is ready to accept a message, but it can discard messages while looking for a specific reply.
  • I've layered the 'client' connection machinery, which uses a receiver task, over a 'plain' connection which implements the Channel interface, with no matching of replies to their requests.

@smurfix
Copy link
Contributor

smurfix commented Feb 2, 2020

Futures: you don't need hazmat or channels. An Event object is sufficient. Your clieńt can simply store the event in the awaiting_reply dict, then wait for it. The receiver reads the event, replaces it with the actual data, then sets the event. The client then pops the reply's data from awaiting_reply.

Bug: your read loop tries to read at most one message from the parser before awaiting more. Consider what happens when an incoming data block contains two or three DBus messages.

You don't want a_stop_receiver flag which you'd need to test. A better pattern is to open a cancel scope and store that someplace; to stop the receiver, simply cancel that. You should add an exception handler that sets all Event objects that remain in the awaiting_reply dict, in order not to deadlock your clients.

Why do you think you need to suspend the receiver? And why can there possibly be unmatched replies?

@takluyver
Copy link
Contributor

takluyver commented Feb 2, 2020

Thanks @smurfix! I'll reply here, but if there's more specific feedback about my code, let's take it the Gitlab merge request so we don't generate noise for people who're interested in the more general question. I should have said this before. :-)

Edit: and if people don't have a Gitlab account, you're welcome to email me.

An Event object is sufficient. Your clieńt can simply store the event in the awaiting_reply dict, then wait for it. The receiver reads the event, replaces it with the actual data, then sets the event. The client then pops the reply's data from awaiting_reply.

Interesting, I hadn't thought of that. It also feels a bit clumsy to use one dict for two-way communication, and with two different types, though. I'd feel a need for each side to have some kind of type check on the objects they get from the dict. So I'm not yet sure if I prefer this to the channels thing. Is there an advantage I'm not seeing?

Bug: your read loop tries to read at most one message from the parser before awaiting more. Consider what happens when an incoming data block contains two or three DBus messages.

I was trying to avoid that trap - so either I can't see how I've failed, or I've written something in a weird way. 😉 Are you looking at this loop or this one?

The former retrieves a single message at a time, and it always checks if the parser has a complete message ready before reading more data from the socket. The loop is to keep reading from the socket until a message is ready.

The latter should keep retrieving messages while there is anything to accept them. It waits on the _wake_receiver event each time, but that's only reset once there are no tasks left to accept messages.

You don't want a_stop_receiver flag which you'd need to test. A better pattern is to open a cancel scope and store that someplace; to stop the receiver, simply cancel that. You should add an exception handler that sets all Event objects that remain in the awaiting_reply dict, in order not to deadlock your clients.

Thanks, that does sound smarter. I'll look more at the trio docs and do that.

Why do you think you need to suspend the receiver?

My thinking is that if there's nothing ready to accept a message, it should apply backpressure rather than consuming and discarding them. This is kind of complicated by the mixture of replies and other messages.

And why can there possibly be unmatched replies?

D-Bus also has 'signal' messages, with a publish-subscribe pattern, which go over the same connections. And AFAIK the same connection sending requests can also receive requests and send replies - clients and servers are not distinguished. So I can't assume that incoming messages are all replies.

The current API also allows a request message to be sent without awaiting the reply, which is plausible if you're doing it for a side-effect such as displaying a notification. That would also lead to an unmatched reply.

@takluyver
Copy link
Contributor

I've had another go at implementing this, and I wound up creating a trio 'Future'. I'm wary of implementing a primitive that trio itself doesn't provide, because I know you're thinking very carefully about good design, so its absence is probably for a good reason. But this is how I got there:

  1. My first attempt used memory channels kind of like futures - send a single value, then close the channel. But it feels like a code-smell to be implementing a simpler thing on top of a more complex thing, and it leaves should-never-happen possibilities hanging around - like closing the send end without sending anything, or sending multiple things.
  2. I looked at @nonsleepr's multiplexer implementation. This is kind of like an async dict, where you can wait for a key to be present. But I only want the producer to put values in if there's a task expecting to receive them.
  3. I tried implementing my own multiplexer with a dict of paused tasks - something like a ParkingLot with keys. But I can't strictly guarantee that the requesting task is always parked there before its reply turns up (although it's practically almost certain). So now I need a way to register that a task is going to wait for a specific ID, and a place to store results that arrive before their destination. I realise that I'm implementing 'dict of futures' without a future.
Initial Future implementation
class Future:
    """A Future for trio.

    Is this a bad idea? Trio doesn't offer futures itself, but I couldn't find
    a neater way to achieve what I wanted.
    """
    def __init__(self):
        self._outcome = None
        self._task = None

    def set(self, outcome):
        self._outcome = outcome
        if self._task is not None:
            reschedule(self._task, outcome)

    async def get(self):
        if self._outcome is not None:
            await trio.hazmat.checkpoint()
            return self._outcome.unwrap()

        self._task = current_task()

        def abort_fn(_):
            self._task = None
            return Abort.SUCCEEDED

        return (await wait_task_rescheduled(abort_fn))

I remain open to suggestions for other ways of doing this. 🙂

@guilledk
Copy link

Hey guys I'm kinda late to this discussion as I just recently joined the trio community, reading through this issue I thing this is the kind of thing I'm talking about in #1503

@decentral1se
Copy link
Member

decentral1se commented May 10, 2020

This is the number 1 thing on my Trio wish list btw ❤️

@belm0
Copy link
Member

belm0 commented May 10, 2020

This is kind of like an async dict, where you can wait for a key to be present. But I only want the producer to put values in if there's a task expecting to receive them.

trio-util AsyncDictionary has is_waiting()

@takluyver
Copy link
Contributor

Thanks, good to know about AsyncDictionary. But I think using is_waiting() would have the same issue I hit trying to implement that myself: I haven't proved that the waiting task must get there before the reply it's expecting. So I want to store some kind of waiting placeholder before the request is sent.

@smurfix
Copy link
Contributor

smurfix commented May 12, 2020

I added an issue for this: groove-x/trio-util#4

@takluyver
Copy link
Contributor

Another point I was uncertain about trying to do this was exception types. The ReceiveChannel interface specifies a few different exceptions for cases like "you've closed the receiver already", "the sender end was closed", or "the channel broke". Should tasks waiting for replies get a similar set of exceptions of fine-grained exceptions? Or is "no reply coming" all they need to know?

@smurfix suggested while reviewing that the requester tasks don't need fine-grained exceptions, so long as the exception from the receiver task bubbles out.

@decentral1se
Copy link
Member

@njsmith
Copy link
Member Author

njsmith commented May 24, 2020

Here's a straw-man idea for a rpc multiplexer primitive we could imagine including in Trio, like Event and memory channels:

@attr.s(frozen=True)
class StartEvent:
    message: Any
    handle: RequestHandle

@attr.s(frozen=True)
class CancelEvent:
    handle: RequestHandle
    abandoned: bool

@attr.s
class RequestHandle:
    def reply(self, outcome):
        ...

    def cancel_succeeded(self):
        ...

class MuxRPC:
    # The initiator side; lots of tasks call this
    async def rpc(self, message, *, abandon_on_cancel: bool):
        ...

    # The backend side; only one tasks loops over this
    async def requests(self) -> AsyncIterable[Union[StartEvent, CancelEvent]]:
        ...

The idea is that the multiplexing backend does:

async for event in muxer.requests():
    ...

Then when a task wants to initiate an RPC, it does:

await muxer.rpc(message, abandon_on_cancel=...)

This triggers the arrival of a StartEvent in the backend, with the corresponding message and opque handle. Presumably the backend then interprets the message part, encodes it into some request in the underlying protocol, and sends that, while also saving the handle object somewhere where it can find it later.

Later, when it gets a reply on the underlying protocol, it finds the appropriate handle object and calls reply. Then the corresponding call to rpc returns with the given outcome.

Cancellation: When an rpc call receives a cancellation request, then the backend gets a CancelEvent. If the underlying protocol has no way to signal cancellation, it should just ignore this. Otherwise, it should send a cancellation request on the underlying protocol. If the underlying protocol later reports that the cancellation was successful, it calls handle.cancel_succeeded(); if the cancel was too late and the operation succeeds anyway, it calls handle.reply(...) as usual.

In the mean time, the behavior of the rpc method depends on whether abandon_on_cancel was set to True or False. If it's True, then it queues the CancelEvent, but then immediately raises Cancelled without waiting for the actual reply; any eventual call to reply or cancel_succeeded will be ignored. If it's False, then it queues the CancelEvent and then resumes waiting.

Open questions:

  • Tracking handle objects seems like a common annoyance; maybe we should have some built-in way to match up handles with backend-assigned, protocol-specific ids?

  • Backpressure? I guess there's not really any point; each call to rpc generates at most 2 messages, and each call causes one task to block, and we don't have any backpressure on task creation.

  • If an rpc call gets cancelled before the corresponding StartEvent is dequeued on the backend, should we just remove it from the queue and pretend it never happened?

  • Definite missing piece: let's say you have a nice well-behaved protocol that's generally reliable, low-latency, and allows cancellation to be transmitted remotely. So normally, you want to transmit cancellation and wait for the reply. But, you still need to be prepared for the case where the remote side just flakes out on you entirely. So like... you need to put a timeout on "transmit cancel and wait for reply", and how do you do that if it's already cancelled?

    Simple solution: instead of abandon_on_cancel: bool, make it abandon_on_cancel_timeout: float, where 0 means "abandon immediately", and otherwise it gives the timeout before the rpc gets abandoned.

    Is that sufficient, or are there cases where you need to abandon based on some other criteria besides a timeout? (I guess maybe you could handle those by manually calling cancel_succeeded on the handle?)

@takluyver
Copy link
Contributor

It looks like this proposal relies on having a separate send task, which probably makes sense. I've so far avoided this in my prototype, but looking back at your initial post, it seems like there's a pretty good case for having one.

If an rpc call gets cancelled before the corresponding StartEvent is dequeued on the backend, should we just remove it from the queue and pretend it never happened?

Yes please - at least as an option. I don't want to send requests that have already been cancelled, and it doesn't look easy to implement that on top of this API.

Tracking handle objects seems like a common annoyance; maybe we should have some built-in way to match up handles with backend-assigned, protocol-specific ids?

I think this is important, because you also want to ensure that the stored handles are cleaned up if the requester task is cancelled. E.g. if there's a malfunctioning auto-retry behaviour, you don't want an ever increasing number of stored handles.

In a simple case like mine, that could be a weakref.WeakValueDict mapping IDs to handles, leaving Python responsible for cleaning up abandoned handles. Or some level of the requester task might do something like:

with muxer.request() as handle:
    send(msg, handle)
    await handle.result()
# Leaving the context cleans up the ID -> handle mapping

@oremanj
Copy link
Member

oremanj commented May 26, 2020

@njsmith I like the simplicity of the approach you've proposed. It doesn't seem like it would be a good fit for protocols where the request or response is a stream rather than a message (like HTTP/2), but I suppose supporting both approaches with the same abstraction would probably make something that's not a great fit for either.

Backpressure? I guess there's not really any point; each call to rpc generates at most 2 messages, and each call causes one task to block, and we don't have any backpressure on task creation.

I don't fully understand how our lack of backpressure on task creation makes backpressure on RPCs useless; is the idea here that the number of pending RPC messages is at most a small multiple of the number of tasks? I can see two reasonable backpressure approaches:

  • Set a soft limit on the size of the queue of messages waiting to be processed by the backend task. StartEvents respect this limit but CancelEvents are allowed to exceed it. This is easy to implement (much moreso than backpressuring the cancels would be), but cancels that affect lots of requests can cause thundering herd issues.
  • Set a limit on the number of outstanding requests.

But, you still need to be prepared for the case where the remote side just flakes out on you entirely. So like... you need to put a timeout on "transmit cancel and wait for reply", and how do you do that if it's already cancelled?

I'm having a hard time thinking of generally well-behaved protocols where you'd want to abandon a single request because it took too long to cancel. You might want to give up on the whole connection if the remote side "flakes out entirely", and one marker of that might be an unreasonable amount of time taken to respond to a cancellation. But these seem to me like parameters one would set on the whole MuxRPC object, not parameters on individual requests.

Another point: if you're going to abandon a particular request on cancellation no matter what, then you probably don't care about its outcome. If you want to wait some amount of time for the response to your cancellation, though, you potentially do care about the difference between "acknowledged as cancelled" and "no clue, gave up". It seems like the former would best be represented as trio.Cancelled and the latter as BrokenResourceError. This fits with my intuition that if you tear down the whole connection, all active requests should fail with BrokenResourceError.

If an rpc call gets cancelled before the corresponding StartEvent is dequeued on the backend, should we just remove it from the queue and pretend it never happened?

I think so -- who's to say it happened at all? This fits with "Cancelled means it never happened" as a general principle.

@njsmith
Copy link
Member Author

njsmith commented May 27, 2020

It doesn't seem like it would be a good fit for protocols where the request or response is a stream rather than a message (like HTTP/2), but I suppose supporting both approaches with the same abstraction would probably make something that's not a great fit for either.

I don't know if it's a good fit or not. It is possible to handle streams in principle: for requests you could send an async iterable as part of your message (for a pull-style API), or you could model the request stream as a series of individual RPCs: handle = start(); handle.send_part(...); handle.send_part(...); .... And similarly, a streaming response can be modeled as a series of get_the_next_piece() RPCs – like how the iterator protocol works :-).

Maybe this is insufficient or awkward for some reason, but I'm not actually sure whether you can do better.

Set a soft limit on the size of the queue of messages waiting to be processed by the backend task. StartEvents respect this limit but CancelEvents are allowed to exceed it. This is easy to implement (much moreso than backpressuring the cancels would be), but cancels that affect lots of requests can cause thundering herd issues.
Set a limit on the number of outstanding requests

OK, but what changes when the limit gets exceeded? Calls to await rpc(...) start to block until there's capacity to handle them? That's the same thing that happens if there's no limit and the RPC StartEvent always gets queued...

I'm having a hard time thinking of generally well-behaved protocols where you'd want to abandon a single request because it took too long to cancel. You might want to give up on the whole connection if the remote side "flakes out entirely", and one marker of that might be an unreasonable amount of time taken to respond to a cancellation. But these seem to me like parameters one would set on the whole MuxRPC object, not parameters on individual requests.

So basically this would mean that backend is responsible for detecting unresponsive peers and doing some kind of "hard" teardown? That seems reasonable, especially in a version 1.

I actually considered leaving out the abandon-on-cancel functionality entirely, and just saying that if that's what you want, your backend should respond to all CancelEvent messages by immediately reporting a successful cancel. But then I thought, what if the backend is experiencing backpressure from the remote peer, so it's not processing the message queue? Probably you still want the abandon to happen immediately.

If an rpc call gets cancelled before the corresponding StartEvent is dequeued on the backend, should we just remove it from the queue and pretend it never happened?

I think so -- who's to say it happened at all? This fits with "Cancelled means it never happened" as a general principle.

The one thing that makes me hesitate is that if a task's nursery gets cancelled before the task starts, then we still start the task, to allow safe handoff of resources. This feels kinda similar, because we're passing control to another context. OTOH trio.to_thread.start_sync does check for cancellation before invoking the function.

I guess what's special about nursery.start_soon is that it's a go-statement, so the parent can't use with blocks to manage the lifetime of resources that you're handing to the new task. For this, it's not a go-statement, so it's probably fine.

@arthur-tacca
Copy link
Contributor

@takluyver I might have missed something, but it looks like there's a bug in your "Initial Future implementation". If the result is set before we wait on it then this line of code is run:

return self._outcome.unwrap()

So the outcome is unwrapped and the underlying result is returned. But if we wait on the future before the result is available, then we end up running these two lines:

# In Future.set():
reschedule(self._task, outcome)
# In Future.get():
return (await wait_task_rescheduled(abort_fn))

That effectively resolves to return outcome, so it isn't unwrapped so the whole outcome object is returned - I think.

@takluyver
Copy link
Contributor

Yes, I think you're right. What I'm actually using in my code is now a simpler version that doesn't use hazmat - thanks to @ntninja for contributing this.

Simplified trio future
class Future:
    """A very simple Future for trio based on `trio.Event`."""
    def __init__(self):
        self._outcome = None
        self._event = trio.Event()

    def set_result(self, result):
        self._outcome = Value(result)
        self._event.set()

    def set_exception(self, exc):
        self._outcome = Error(exc)
        self._event.set()

    async def get(self):
        await self._event.wait()
        return self._outcome.unwrap()

This is pretty basic, e.g. there's no protection against setting the same future twice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants