Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Equivalent of asyncio.call_soon_threadsafe while still having explicit responibility for errors #991

Closed
0zeroth opened this issue Apr 5, 2019 · 9 comments

Comments

@0zeroth
Copy link

0zeroth commented Apr 5, 2019

Apologies for the lack of a reproducible example - I had trouble faking an API that was as badly behaved as the 3rd party API that I am stuck with! I'm hoping the following description will be sufficient to get some useful help. If not, I can chip a way at trying to reproduce the behaviour.

The stop function is badly behaved - it consistently causes the _callback_threadsafe to be called before it returns. How do we handle this in trio? (a detailed explanation follows)

import trio

class BadApi:
    """
    __init__ with a callback that will be called at arbitrary times from arbitrary threads

    start() must be called first, then AFTER callback('started') has been called,
    further calls can be made

    request(id, payload) will cause callback('response', id, response_payload) to be called

    stop() must be called, and then AFTER callback('stoppped') has been called we know all threads
    have been killed. (failing to call stop causes app to hang because threads are not dead)
    """

    def __init__(self, callback): ...
    def start(self): ...
    def stop(self): ...
    def request(self, id, payload): ...



class TrioApi:

    def __init__(self):
        self._api = BadApi(self._callback_threadsafe)
        self._started = trio.Event()
        self._stopped = trio.Event()
        self._portal = None

    async def start(self):
        self._portal = trio.BlockingTrioPortal()
        self._api.start()   # This returns BEFORE _callback_threadsafe executes on another thread
        await self._started.wait()

    async def stop(self):
        self._api.stop()    # This does not return - we end up stuck in _callback_threadsafe
        await self._stopped.wait()

    async def request(self, id, payload):
        # See https://github.com/python-trio/trio/issues/467
        #   Add "one obvious way" for implementing the common multiplexed request/response pattern
        pass

    def _callback_threadsafe(self, , *args):
        print('_callback_threadsafe')
        self._portal.run_sync(self._callback, *args)

    def _callback(self, msg_type, id=None, payload=None):
        print(f'_callback {msg_type}')
        if msg_type == 'started':
            self._started.set()
        elif msg_type == 'stopped':
            self._stopped.set()
        else:
            pass  # Handle the response


async def main():
    api = TrioApi()
    await api.start()
    await trio.sleep(1.)
    await api.stop()

trio.run(main)

# Output (with commentary):
#   _api.start()                     (trio thread)
#   _callback_threadsafe      (api thread)
#   _callback started            (trio thread)
#   _callback_threadsafe      (api thread) <-- here is the point we called stop() - _api.stop() never returned

The start function works as expected. The _api.start() function returns, and then at a later time, on another thread our _callback_threadsafe function is called and we schedule _callback across to the trio thread which sets our event.

The stop function is badly behaved - it consistently causes the _callback_threadsafe to be called before it returns, and will not return until _callback_threadsafe returns. So it decides to act synchronously despite being documented otherwise. As far as I can tell, this causes a deadlock because trio is waiting for Api.stop() to finish, which is waiting for BadApi.stop(), which is waiting for the _callback to finish, which is waiting for BlockingTrioPortal.run_sync to finish. But run_sync cannot finish, because trio never gets the chance to schedule _callback.

Having the BlockingTrioPortal block (the clue is in the name!) is consistent with the trio philosophy of not dropping errors: if we didn't block, how would we know if an error had occured in _callback? So, perhaps running the _callback is actually a type of task, and should be handed off to a nursery. I tried that: give the TrioApi a nursery object, and then use something like (pseudo-code, a little async needs to sprinkled into the TrioApi):

self._portal.run_sync(
    lambda: self._nursery.start_soon(self._callback, *args)

This did not work for me - and at this point I'm not sure if I've just got some silly mistake, or am going about this completely the wrong way.

My intention is palm responsibility for errors onto the nursery. call_soon should return immediately, allowing the _portal.run_sync to complete. Then the nursery can be configured to handle errors in a sensible way.

To sum up: how can we schedule a call on the trio thread in a similar way to asyncios call_soon_threadsafe, but staying within the philosophy of trio?

@0zeroth
Copy link
Author

0zeroth commented Apr 5, 2019

As a comment to my question: I'm reading with interest the tickets like #467. At least for me, wrapping existing api's is an extremely common task. A collection of simple tutorials along the lines of 'given an old-school API like this, here is what we consider best-practice to wrap it, with reasons why' would be an invaluable tool for at least two reasons:

  • people who actually need to wrap an api have a place to get started, and plenty of hints about issues they probably would not otherwise consider (eg my code above doesn't even consider cancellation, or treating the TrioApi like a task or maybe a context to ensure stop is always called)
  • To spread the word about what a good trio api should look like. Even if we're lucky enough to only need deal with trio primatives, we could still cobble together an api that behaved pretty badly!

I was hoping to turn my experiments with this API into some sort of tutorial - but since I fell at the first hurdle it seems I've got a bit of a way to go!!

@njsmith
Copy link
Member

njsmith commented Apr 5, 2019

The stop function is badly behaved - it consistently causes the _callback_threadsafe to be called before it returns, and will not return until _callback_threadsafe returns. So it decides to act synchronously despite being documented otherwise. As far as I can tell, this causes a deadlock because trio is waiting for Api.stop() to finish, which is waiting for BadApi.stop(), which is waiting for the _callback to finish, which is waiting for BlockingTrioPortal.run_sync to finish. But run_sync cannot finish, because trio never gets the chance to schedule _callback.

Let's see if I understand right: your theory is that

  1. From the Trio thread, in TrioApi.stop, you synchronously call self._api.stop()
  2. Then self._api.stop synchronously calls self._callback_threadsafe(...)
  3. Then self._callback_threadsafe synchronously calls self._portal.run_sync(self._callback, ...)
  4. And this deadlocks, because you're doing a synchronous call in the Trio thread, that's trying to re-enter the Trio thread

Did I follow right?

That would be a problem, yeah... and doing self._portal.run_sync(nursery.start_soon, ...) wouldn't help, because the deadlock happens as soon as you call self._portal.run_sync, so it wouldn't matter which callback you pass. Any call to portal.run or portal.run_sync from the main Trio thread would cause an instant deadlock, guaranteed.

The weird thing though, is that we thought of this, so run and run_sync actually check for this case, and raise an error if you try to call them from the main Trio thread:

In [1]: import trio                                                             

In [2]: async def deadlocker(): 
   ...:     portal = trio.BlockingTrioPortal()                                  
   ...:     portal.run_sync(lambda: None)                                       

In [3]: trio.run(deadlocker)
Traceback (most recent call last):
  [...]
  File "<stdin>", line 3, in deadlocker
  File "/home/njs/.user-python3.6/lib/python3.6/site-packages/trio/_threads.py", line 121, in run_sync
    return self._do_it(self._run_sync_cb, fn, *args)
  File "/home/njs/.user-python3.6/lib/python3.6/site-packages/trio/_threads.py", line 82, in _do_it
    "this is a blocking function; call it from a thread"
RuntimeError: this is a blocking function; call it from a thread

So I guess that's the first thing to figure out... why aren't you seeing this error? If self._api.stop() is really calling self._callback_threadsafe directly in the main Trio thread, then self._callback_threadsafe should be immediately raising RuntimeError. So... is your badly behaved API so badly behaved that when a callback raises an exception, it swallows the exception and deadlocks? Or is there something else going on, that causes the error to not be generated in the first place...?

I'd start by adding some more debugging to _callback_threadsafe. Maybe something like:

    def _callback_threadsafe(self, , *args):
        print('_callback_threadsafe', *args)
        print('in thread:', threading.current_thread())
        try:
            self._portal.run_sync(self._callback, *args)
        except:
            print("run_sync raised:", sys.exc_info())
            raise
        else:
            print("run_sync returned successfully")

...and then see what that says.

@0zeroth
Copy link
Author

0zeroth commented Apr 5, 2019

Not quite: self._callback_threadsafe(...) is called from another thread (call it the API thread) (I have all the logging you suggest in my production code, but stripped it to make the example readable). But it just so happens self._callback_threadsafe(...) is called consistently before self._api.stop() returns. So, as far as I can tell, the trio thread is stuck waiting for self._api.stop() to return, but the API thread is stuck waiting for portal.run_sync to finish (and it won't, because trio never gets the chance to schedule the function).

In addition, I tried putting the following call chain (the logger is configured to log the thread):

    def _callback_threadsafe(self, *args):
        self.log.debug('_callback_threadsafe')
        self._portal.run(self._callback_nursery, *args)
        self.log.debug('_callback_threadsafe finished')

    async def _callback_nursery(self, *args):
        await trio.sleep(0.)
        self.log.debug('_callback_nursery')
        self._nursery.start_soon(self._callback, *args)

    async def _callback(self, event: blpapi.Event, session: blpapi.Session):
        await trio.sleep(0.)
        self.log.debug('_callback')

In this case _callback_nursery never gets called - so it appears to be the same problem - the blocking portal seems to require trio has the opportunity to schedule the task on before it can complete.

@0zeroth
Copy link
Author

0zeroth commented Apr 5, 2019

I figured out a reproducible example:

import time
import trio
import threading
import logging
log = logging.getLogger('demo')

class BadApi:
    """
    __init__ with a callback that will be called at arbitrary times from arbitrary threads

    start() must be called first, then AFTER callback('started') has been called,
    further calls can be made

    request(id, payload) will cause callback('response', id, response_payload) to be called

    stop() must be called, and then AFTER callback('stoppped') has been called we know all threads
    have been killed. (failing to call stop causes app to hang because threads are not dead)
    """

    def __init__(self, callback):
        self._callback = callback

    def _do_callback(self, delay, args):
        time.sleep(delay)
        self._callback(*args)

    def start(self):
        # delay so we are sure this function finished first
        thread = threading.Thread(target=self._do_callback, args=(1., ('started',)))
        thread.start()

    def stop(self):
        thread = threading.Thread(target=self._do_callback, args=(0., ('stopped',)))
        thread.start()
        thread.join()  # require callback finishes first


class TrioApi:

    def __init__(self):
        self._api = BadApi(self._callback_threadsafe)
        self._started = trio.Event()
        self._stopped = trio.Event()
        self._portal = None

    async def start(self):
        self._portal = trio.BlockingTrioPortal()
        log.debug(f'_api.start()')
        self._api.start()   # This returns BEFORE _callback_threadsafe executes on another thread
        log.debug(f'_api.start() finished')
        await self._started.wait()

    async def stop(self):
        log.debug(f'_api.stop()')
        self._api.stop()    # This does not return - we end up stuck in _callback_threadsafe
        log.debug(f'_api.stop() finished')
        await self._stopped.wait()


    def _callback_threadsafe(self, *args):
        log.debug('_callback_threadsafe')
        self._portal.run_sync(self._callback, *args)

    def _callback(self, msg_type, id=None, payload=None):
        log.debug(f'_callback {msg_type}')
        if msg_type == 'started':
            self._started.set()
        elif msg_type == 'stopped':
            self._stopped.set()
        else:
            pass  # Handle the response


async def main():
    api = TrioApi()
    await api.start()
    await trio.sleep(1.)
    await api.stop()


logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(threadName)s %(name)s: %(message)s")
trio.run(main)

Output:

2019-04-05 14:57:06,649 [DEBUG] MainThread demo: _api.start()
2019-04-05 14:57:06,650 [DEBUG] MainThread demo: _api.start() finished
2019-04-05 14:57:07,650 [DEBUG] Thread-1 demo: _callback_threadsafe
2019-04-05 14:57:07,650 [DEBUG] MainThread demo: _callback started
2019-04-05 14:57:08,652 [DEBUG] MainThread demo: _api.stop()
2019-04-05 14:57:08,652 [DEBUG] Thread-2 demo: _callback_threadsafe

@njsmith
Copy link
Member

njsmith commented Apr 5, 2019

Ah, so when you call self._api.stop(), that delegates some worker thread to call the callback, but then self._api.stop() blocks the main thread waiting for the worker thread to report that the callback has completed? Oof, yeah, that would indeed subvert Trio's attempts to protect you from deadlock!

So FYI, you left in some type hints there that reveal the identity of BadApi – let me know if you need that stripped from the comment/comment history. This is very helpful for giving advice though :-). Assuming I have the right library, it looks like it actually provides two different methods for stopping the session. There's stop:

Stop operation of this session and block until all callbacks to EventHandler objects relating to this Session which are currently in progress have completed (including the callback to handle the SESSION_STATUS Event with SessionTerminated message this call generates).

And then there's stopAsync:

Begin the process to stop this Session and return immediately. The application must monitor events for a SESSION_STATUS Event with SessionTerminated message which will be generated once the Session has been stopped.

So it sounds like your problem is, right now your code is designed around the stopAsync semantics, but you're calling stop :-). So your options are:

  • If you keep using stop, which is documented as a blocking operation, then you should push it off into a worker thread to avoid blocking the main Trio thread: await trio.run_sync_in_worker_thread(self._api.stop). In this case you can remove the _stopped event, because the self._api.stop call won't return until after the shutdown is complete.

  • OR, you should use stopAsync, which is documented to return immediately, and then wait for the _stopped event like you are now.

Those docs also describe a distinction between start and startAsync that you might want to look into.

Also I should probably mention that I'm also available as a consultant for help and advice, in case you'd find it easier to show me actual code under NDA...

@0zeroth
Copy link
Author

0zeroth commented Apr 5, 2019

No, no secrets its the Bloomberg API, but that code isn't useful when asking for help as people can't run it without a paid up license! That's why I attempted to strip out the distraction... the only problem is you've run ahead and read the docs (thanks!!!) but in my prod code I am calling Bloombergs stopAsync and it behaves as per my reproducible example. I was also surprised - but digging into the asyncio wrapper I have it works the same, although asyncio doesn't care because call_soon_threadsafe returns immediately. My fault for not spending the time creating a minimal working example in the first instance.

So, I think the runnable toy example fully captures the problem, but maybe its too simple as it is deterministic. What I'm more concerned about is the general case. If an API will schedule the callback on another thread, we can't in general assume the triggering function will complete before the callback (or can we, correct me if I'm wrong, I would love to be wrong :) For example:

def bad_api_do_request():
    self._internal_do_request(); # This triggers the callback on another thread at some time in the future
    self._internal_do_bookeeping();  # What if this takes 1ms? 10ms? 100ms? 1000ms?

I might be missing the point, but even a 'non-blocking' function takes some time to complete, and during that time python may decide to schedule the other thread to do some work, so maybe we'll be unlucky and the callback will trigger.

I'm not worried about a single 'bad' function like my stop example - it can be worked around. But it seems to hint there is something broken in the pattern being used to wrap this api in the general case.

@0zeroth
Copy link
Author

0zeroth commented Apr 5, 2019

Ok - TrioToken.run_sync_soon was exactly what I was after. Apart from being a little hard to find (being in hazmat, which is understandable) the documentation was perfect. Below is a complete working example of wrapping a toy api that (I think) is representative of a real-life callback based API. What I really like is that any errors reliably crash the application. Despite my best efforts, I'm not confident my asyncio wrapped implementations of similar api's do that...

I also like the way I ended up being forced to inject a Nursery. I had been thinking it would make some sort of sense (suppose there were many different functions like request, they should do all their work in the same nursery, and by injecting it the user of TrioApi gets control) but I was going to 'do it later'. It's really nice that Trio forced me towards better design right from the beginning!

This example doesn't even consider cancellation, but if I update it I'll come back and add some edits. Hopefully someone finds this useful!

import random
import time

import trio
import trio.hazmat
import threading
import logging
log = logging.getLogger('demo')

class BadApi:
    """
    __init__ with a callback that will be called at arbitrary times from arbitrary threads

    start() must be called first, then AFTER callback('started') has been called,
    further calls can be made

    request(request_id, i) will cause
       multiple callback('partial_response', id, i) followed by a
       final callback('response', id, i)

    stop() must be called, and then AFTER callback('stoppped') has been called we know all threads
    have been killed. (failing to call stop causes app to hang because threads are not dead)
    """

    def __init__(self, callback):
        self._callback = callback
        self._started = False
        self.log = log.getChild('BadApi')

    def start(self):
        self.log.debug('start')
        def _reply():
            self.log.debug(f'_reply started')
            self._started = True
            self._callback('started')
            self.log.info('started')
        thread = threading.Thread(target=_reply)
        thread.start()

    def stop(self):
        # This implementation has a nasty gotcha: it is async in the sense that the callback happens on another
        # thread, but it just to happens that callback _always_ happens before this function returns
        # This mimics a real api: it is often the case that an API might do things synchronously if it is able
        self.log.debug('stop')
        assert self._started

        def _reply():
            self.log.debug(f'_reply stopped')
            self._callback('stopped')
            self.log.info('stopped')

        thread = threading.Thread(target=_reply)
        thread.start()
        thread.join()  # require callback finishes first

    def request(self, request_id, v):
        self.log.debug(f'request {request_id} {v}')
        assert isinstance(v, int)
        assert self._started

        def _reply():
            for i in range(0, v):
                self.log.debug(f'_reply partial_response {request_id} {i * v}')
                self._callback('partial_response', request_id, i * v)
                time.sleep(random.uniform(0.01, 1))
            self._callback('response', request_id, v * v)
            self.log.info(f'request {request_id} complete')

        thread = threading.Thread(target=_reply)
        thread.start()
        self.log.info(f'request {request_id} pending')

class TrioApi:

    def __init__(self):
        self._api = BadApi(self._callback_threadsafe)
        self._started = trio.Event()
        self._stopped = trio.Event()
        self._trio_token = None
        self._nursery = None
        self.log = log.getChild('TrioApi')

    async def start(self, nursery):
        self.log.debug('start')
        self._nursery = nursery
        self._trio_token = trio.hazmat.current_trio_token()
        self._api.start()  # self._callback_threadsafe will happen after this call returns.
        await self._started.wait()
        self.log.info('started')

    async def stop(self):
        self.log.debug('stop')
        self._api.stop()  # self._callback_threadsafe will happen before this call returns. Bad Api!
        await self._stopped.wait()
        self.log.info('stopped')

    async def request(self, v):
        self.log.debug(f'request {v}')
        send_channel, receive_channel = trio.open_memory_channel(10)

        def _callback(msg_type, r):
            # This happens in a trio system task, so if we don't have the buffer,
            # an exception will be raised and crash out of trio.run(...)
            # We could think about back-pressure if we thought BadApi would be kind to a slow consumer,
            # but it's not immediately obvious how we would create a blocking call to wait for the channel to be ready.
            self.log.debug(f'request _callback sending {r}')
            send_channel.send_nowait(r)
            if msg_type == 'response':
                self._nursery.start_soon(send_channel.aclose)
                self.log.info('request complete')

        self._api.request(_callback, v)
        self.log.info('request pending')

        # This looks like it would work, but doesn't, as async generators are 'too difficult'?
        # Not sure - there are is a lot written about it in the issue tracker.
        # To see the problem, uncomment the following code (and remove the return statement),
        # and try raising an error in the client code (see the SIMULATE_ERROR global)
        #
        #async with receive_channel:
        #    async for r in receive_channel:
        #        yield r

        return receive_channel

    def _callback_threadsafe(self, *args):
        # If we use the BlockingTrioPortal then this callback blocks if the user calls api.stop().
        # That is because there is a deadlock using that method: trio cannot run the callback until BadApi.stop()
        # returns, but BadApi.stop() won't return until the callback is run
        #
        # Using TrioToken.run_sync_soon solves that as the callback is placed in a queue and run_sync_soon returns
        # immediately. This lets api.stop() return, and then trio picks up the callback up off the queue and creates
        # a system task. Among other things, that means that errors are not propagated back to this function, instead
        # any errors will crash trio.run(...)
        #
        # In this case, that is _exactly_ what we want! Because BadApi does not want to know about errors in this
        # callback. It is probably going to drop them on the floor, or if we are lucky log them and continue, or maybe
        # it will crash... this way, we are explicitly saying self._callback will handle all errors, or we expect the
        # program to crash (so we are forced to fix the problem)
        self.log.debug(f'_callback_threadsafe ...')
        self._trio_token.run_sync_soon(self._callback, *args)

    def _callback(self, msg_type, response_id=None, result=None):
        # This is called in a system task. That means any bugs in our implementation will crash trio.run(...)
        # This isn't terrible (it's better than the error being passed back to the api thread!) but it prints an
        # error message like: "trio.TrioInternalError: internal error in trio - please file a bug!"... and people will
        # get quite annoyed if we followed that advice!
        #
        # One idea would be execute this code in self._nursery. However, that alone doesn't buy us much, because after
        # the error propagate to the nursery, the nursery is closed. The next message to come from the api (which will
        # probably happen no matter how fast we try to stop it?) will hit a 'closed' nursery, which will cause an error
        # in the system task anyway.
        #
        # I'm not sure it makes any sense to think about putting this in a nursery and handling cancellation:
        # if there is a bug in this api, probably the only sensible thing is to crash quick-smart and fix the bug -
        # it's not something that can be recovered from.
        self.log.debug(f'_callback {msg_type} {response_id} {result}')
        if msg_type == 'started':
            self._started.set()
        elif msg_type == 'stopped':
            self._stopped.set()
        else:
            # This is 'partial_response' or 'response' and we made the id callable
            # If BadApi forced us to use an int (for example) we would need a look-up
            response_id(msg_type, result)


async def do_request(api, i):
    receive_channel = await api.request(i)
    async with receive_channel:
        async for v in receive_channel:
            if SIMULATE_ERROR and v == 15:
                raise RuntimeError('Simulated error -- v == 15')
            print(f'request({i}): {v}')


async def main():
    api = TrioApi()
    async with trio.open_nursery() as nursery:
        await api.start(nursery)
        print('Started!')
        async with trio.open_nursery() as all_requests:
            all_requests.start_soon(do_request, api, 3)
            all_requests.start_soon(do_request, api, 4)
            all_requests.start_soon(do_request, api, 5)
        await api.stop()
        print('Stopped!')


# Set SIMULATE_ERROR to True to see an error from the client handling of a request
# Set the logging level to DEBUG for more, or WARN for none.
SIMULATE_ERROR = False
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(threadName)s %(name)s: %(message)s")
trio.run(main)
EDIT from @ziirish: enable syntax highlighting

@oremanj
Copy link
Member

oremanj commented May 2, 2019

It doesn't look like there's any action item left on this issue, so I'm going to close it. Glad you got a working solution, and thanks for posting it for the rest of us to look at! I'm leaving a link on #802 as a good example of the sorts of porting problems people are facing.

@sfuller14
Copy link

sfuller14 commented Dec 2, 2023

Below is a complete working example of wrapping a toy api that (I think) is representative of a real-life callback based API.

THANK YOU (both). I have been trying to find a workaround to this exact issue with the blpapi Python implementation for a very long time.

Hopefully someone finds this useful!

Yes. Very.

class BadApi:

Said 3rd party should consider renaming their Python library to this. The utter lack of any semblance of documentation has sadly not improved since this Issue was submitted 5 years ago...

This example doesn't even consider cancellation, but if I update it I'll come back and add some edits.

I would be extremely interested in any other tips or patterns you found when working through the above.

@0zeroth

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants