Skip to content

Support for Semaphore / Parallel Execution / asyncio.FIRST_COMPLETE ?  #1

@patrickvonplaten

Description

@patrickvonplaten

Amazing library! I definitely share your pain with error handling in asyncio!

Question: Can tinyio support asyncio.FIRST_COMPLETED / parallel execution / semaphore use cases?

Background

A common use case many people have with asyncio (especially when it comes to data loading in ML training runs) is to combine asyncio with ThreadPoolExecutors and/or asyncio's Semaphore.

More specifically:

  • You want to lazily load large datasets for training. A Python iterator is a common abstraction for this.
  • Iterators are then chained together and filtering / transformation methods are added. The final pipeline allows you to go from "load data iterator" -> .... -> sample batch for training iterator
  • Now many intermediate transformation (that are blocking and can be very slow & heavy) as well as async functions might be called (requests to server etc...)
    => A common approach currently to solve this is to combine ThreadPoolExecutor with asyncio.Semaphore logic

Example:

E.g. the following:

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
import random

# Example slow function
def slow_transform(x):
    time.sleep(random.uniform(0.5, 2.0))  # simulate slow IO-bound work
    return x * x

# Exemplary async wrapper for the synchronous slow_transform function
# This way of wrapping block -> async and back is very annoying and makes error management difficult
async def async_slow_transform(x, semaphore, executor):
    async with semaphore:
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(executor, slow_transform, x)
        return result

# Main coroutine that processes the iterator
async def process_iterator(iterator, max_concurrent=16):
    semaphore = asyncio.Semaphore(max_concurrent)
    executor = ThreadPoolExecutor(max_workers=max_concurrent)

    tasks = set()

    for item in iterator:
        task = asyncio.create_task(async_slow_transform(item, semaphore, executor))
        tasks.add(task)

        if len(tasks) >= max_concurrent:
            # Wait for the first task to finish and yield its result immediately
            done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
            for completed_task in done:
                yield completed_task.result()

    # Yield remaining tasks
    for task in asyncio.as_completed(tasks):
        yield await task

    executor.shutdown()

# Just an example
async def main():
    iterator = range(1, 51)  # Example iterator
    async for result in process_iterator(iterator):
        print("Result:", result)

# Run the main coroutine
if __name__ == "__main__":
    asyncio.run(main())

=> Do you think tinyio could simplify the above workflow significantly and improve error handling?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions