Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement select()/merge() on ReceiveChannels #1411

Closed
bergus opened this issue Feb 22, 2020 · 2 comments
Closed

Implement select()/merge() on ReceiveChannels #1411

bergus opened this issue Feb 22, 2020 · 2 comments

Comments

@bergus
Copy link

bergus commented Feb 22, 2020

I want to receive values from multiple channels at once, and more importantly, receive only the first value from either of them. I have not found a good way to do that, at least with the libraries exported methods.

There's that race function example in the docs, and I can use something similar but it appears to have multiple problems:

  • It doesn't really receive only the first value, it receives any number of values and returns only one of them.
  • It's creating an extra memory channel pair and a nursery and a task for every channel on every call, and receiving from one only to send it to the next seems pretty unnecessary (inefficient) to me
  • If I use this pattern to merge multiple ReceiveChannels into one, I cannot easily hook into the closing of the merged channel to stop (cancel) receiving from the input channels
  • It doesn't tell me from which channel the value was received (ok, that can be easily fixed)

This looks like a large hole in the API surface of trio to me.

There was some Gitter discussion about this in August 2019, but it seems nothing came out of it.

I think the first problem (multiple receives) can be fixed if I call cancel earlier, like wait_any from trio-util does it:

async def select(*channels):
    if not channels:
        raise trio.EndOfChannel
    send_result, receive_result = trio.open_memory_channel(0)
    async with trio.open_nursery() as nursery:
        async def jockey(chan, i):
            val = await chan.receive()
            nursery.cancel_scope.cancel()
            send_result.send_nowait((val, i))
        for i, chan in enumerate(channels):
            nursery.start_soon(jockey, chan, i)
    return await receive_result.receive()

However, I would prefer if there was a native way to do that without repeating this helper function in every project. I think it would rather easy to have a MemoryReceiveChannel that can receive from multiple producers, by just replacing this._state with an array of states and looping over them everywhere the state was used:

async def select(*receive_channels):
    async with merge(*receive_channels) as any:
        return await any.receive()

def merge(*receive_channels):
    states = []
    for chan in receive_channels:
        if isinstance(chan, MemoryReceiveChannel):
            states.extend(chan._states)
        elif isinstance(chan, MemoryMultiReceiveChannel):
            states.append(chan._state)
        else:
            raise ValueError('Not a memory channel')
    return MemoryMultiReceiveChannel(states)

@attr.s(eq=False)
class MemoryMultiReceiveChannel(ReceiveChannel):
    _states = attr.ib()
    _closed = attr.ib(default=False)
    _tasks = attr.ib(factory=set)

    def __attrs_post_init__(self):
        for state in self._states:
            state.open_receive_channels += 1

    def statistics(self):
        # should probably sum the statistics from all states - sans open_receive_channels?
        pass

    def __repr__(self):
        return f"<multiple receive channel at {id(self):#x}, using {len(self._states)} buffers>"

    @enable_ki_protection
    def receive_nowait(self):
        if self._closed:
            raise trio.ClosedResourceError
        open_senders = 0
        for state in self._states:
            if state.send_tasks:
                task, value = state.send_tasks.popitem(last=False)
                task.custom_sleep_data._tasks.remove(task)
                trio.hazmat.reschedule(task)
                state.data.append(value)
                # Fall through
            if state.data:
                return state.data.popleft()
            open_senders += state.open_send_channels
            # potential optimisation: remove closed channels from self._states
        if not open_senders:
            raise trio.EndOfChannel
        raise trio.WouldBlock

    @enable_ki_protection
    async def receive(self):
        await trio.hazmat.checkpoint_if_cancelled()
        try:
            value = self.receive_nowait()
        except trio.WouldBlock:
            pass
        else:
            await trio.hazmat.cancel_shielded_checkpoint()
            return value

        task = trio.hazmat.current_task()
        self._tasks.add(task)
        for state in self._states:
            state.receive_tasks[task] = None
        task.custom_sleep_data = self

        def abort_fn(_):
            self._tasks.remove(task)
            for state in self._states:
                del state.receive_tasks[task]
            return trio.hazmat.Abort.SUCCEEDED

        value = await trio.hazmat.wait_task_rescheduled(abort_fn)
        for state in self._states:
            del state.receive_tasks[task]
        return value

    @enable_ki_protection
    def clone(self):
        if self._closed:
            raise trio.ClosedResourceError
        return MemoryReceiveChannel._create(self._states)

    @enable_ki_protection
    async def aclose(self):
        if self._closed:
            await trio.hazmat.checkpoint()
            return
        self._closed = True
        for task in self._tasks:
            trio.hazmat.reschedule(task, Error(trio.ClosedResourceError()))
            del self._state.receive_tasks[task]
        self._tasks.clear()
        for state in self._states:
            state.open_receive_channels -= 1
            if state.open_receive_channels == 0:
                assert not state.receive_tasks
                for task in state.send_tasks:
                    task.custom_sleep_data._tasks.remove(task)
                    trio.hazmat.reschedule(task, Error(trio.BrokenResourceError()))
                state.send_tasks.clear()
                state.data.clear()
        await trio.hazmat.checkpoint()
@njsmith
Copy link
Member

njsmith commented Feb 22, 2020

The main issue for discussing select-type functionality is #242; there's a lot of discussion there already if you want to catch up on the trade-offs we've been considering.

The tl;dr is that select-type functionality is definitely possible to implement, BUT it requires some special support in the internals of all the objects that want to support select. You can't fake it with a race or wait_any-style helper, because one of the key properties of select is that it consumes exactly one ready event, while race/wait_any might accidentally consume multiple events.

If we want to support this directly in the Trio core, then we should probably do it by providing a generic framework that lets arbitrary objects support select, and implement it for all of Trio's built-in synchronization primitives, not just memory channels. But, that would be a pretty big commitment, it raises a lot of complicated design questions, and it's not clear whether select is actually the right solution often enough to be worth it. (In a lot of cases where folks think they want select, it turns out that there's another way to write things that's easier to understand and less complicated. But probably not all? It's hard to say.)

Let's close this issue and continue the discussion in #242, to keep everything in the same place. I think the main two things that would help move this forward would be:

  • Collecting use cases for select – if you could write up the details of what you're trying to do and why select is the right solution and post it in Discussion: what's the best strategy for "nowait" and "select" operations? #242, that would be super helpful.
  • Building experimental implementations – it's totally possible to put together a custom version of memory channels that supports select and stick it on PyPI, and that lets folks who like select experiment with API approaches and get experience using it in real problems, which would be super helpful for helping us figure out what the Trio core should do.

@njsmith njsmith closed this as completed Feb 22, 2020
@bergus
Copy link
Author

bergus commented Feb 22, 2020

Oh, how could I not have found that thread? Thanks, I'll head over there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants