Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it possible to chain calls with oaib? #6

Open
aniemerg opened this issue Feb 13, 2024 · 4 comments
Open

Is it possible to chain calls with oaib? #6

aniemerg opened this issue Feb 13, 2024 · 4 comments

Comments

@aniemerg
Copy link

Really cool tool, but is it possible to chain calls. I.e. batch a first call, process the result and then add a second call to the batch to act on the result?

I saw that there are callbacks, but not sure if that can be used to do this in a clean way.

@ctjlewis
Copy link
Contributor

Let me get back to this in a few hours. There's some fires I'm trying to put out with coroutine resolution and then we'll also have a way to save additional row values with add() and set an index during init with Batch(index=[...]).

Could you give an example of the pattern you would ideally want?

@ctjlewis
Copy link
Contributor

Ah, I see what you mean now. I talked with @jvmncs about this yesterday, where we could get a response to some input and then chain them in add with a callback that might call another batch or something. Will get back to you on this, thoughts on the pattern are welcome.

@aniemerg
Copy link
Author

aniemerg commented Feb 13, 2024

My goal would be able to have a function like this:

async def process_input(batch, input_data):
    promise = await batch.add(input_data)
    result = await promise
    new_input = modify_result(result)
    second_promise = await batch.add(new_input)
    return await second_promise

where the dev experience is creating a process_input function that will receive inputs assigned to it, call OpenAI via the "batch" class, process the result, call OpenAI again via batch, and return the final output. This function itself could then be called via asyncio, like:

    batch = RateLimitedBatch(rpm=100, tpm=1_000, workers=5)
    batch.start_workers()  
    inputs = [1, 2, 3]
    results = await asyncio.gather(*(process_input(batch, input_data) for input_data in inputs))
    print(results)

    await batch.shutdown()  # Shutdown workers after all tasks are processed

Here's an toy implementation of RateLimitedBatch that I pulled together so this code can run:

import asyncio
from asyncio import Queue

class RateLimitedBatch:
    def __init__(self, rpm, tpm, workers):
        self.queue = Queue()
        self.semaphore = asyncio.Semaphore(workers)
        self.workers = []
        self.total_workers = 3
        self.rpm = rpm
        self.tpm = tpm

    async def add(self, input_data):
        future = asyncio.get_running_loop().create_future()
        await self.queue.put((input_data, future))
        return future

    async def worker(self):
        while True:
            input_data, future = await self.queue.get()
            if input_data is None:  # Shutdown signal
                future.set_result(None)  # Optionally handle shutdown result
                break
            # Simulate API call with a delay
            await asyncio.sleep(1)  # Placeholder for real work
            result = input_data * 2  # Example processing logic
            future.set_result(result)
            self.queue.task_done()

    def start_workers(self):
        for _ in range(self.total_workers):
            task = asyncio.create_task(self.worker())
            self.workers.append(task)

    async def shutdown(self):
        for _ in range(len(self.workers)):  # Send shutdown signal for each worker
            await self.queue.put((None, asyncio.get_running_loop().create_future()))
        await asyncio.gather(*self.workers)  # Wait for all workers to complete

@ctjlewis
Copy link
Contributor

@aniemerg I'm kind of tied up but it seems like this might be a higher level refactor, like you might have a better grasp on how to work this process with asyncio than I did originally writing it. Would you be open to maybe putting in a refactor to support this, and maybe reviewing existing logic and rewiring anything as appropriate?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants