Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The repo doesn't support function calls #2

Open
dividor opened this issue Jun 11, 2024 · 3 comments · May be fixed by #6
Open

The repo doesn't support function calls #2

dividor opened this issue Jun 11, 2024 · 3 comments · May be fixed by #6

Comments

@dividor
Copy link

dividor commented Jun 11, 2024

Hi There,

I am not seeing any function call support, they aren't included in the handler and testing with functions results in no activity. Basically, there is no ...

if delta.type == "function":

... and code to handle that output in the handler.

Is there any plan to include the pretty important feature please?

Thanks!

@dividor
Copy link
Author

dividor commented Jun 11, 2024

I tried applying OpenAI's approach here, and trying to use Async counterparts got this ...

"TypeError: 'AsyncAssistantStreamManager' object does not support the context manager protocol"

In the end, the only way I could this to work was to use the non-async code in an async on_event method. I suspect there are more elegant ways, but here is a snippet of code to add ...

from openai import AssistantEventHandler, OpenAI
from typing_extensions import override

Then in the event handler class ...

    @override
    async def on_event(self, event):
        # Retrieve events that are denoted with 'requires_action'
        # since these will have our tool_calls
        if event.event == 'thread.run.requires_action':
            run_id = event.data.id  # Retrieve the run ID from the event data
            for tool in event.data.required_action.submit_tool_outputs.tool_calls:
                if tool.function.name == "execute_query_action_localhost":
                    with sync_openai_client.beta.threads.runs.submit_tool_outputs_stream(
                        thread_id=self.current_run.thread_id,
                        run_id=self.current_run.id,
                        tool_outputs=[{"tool_call_id": tool.id, "output": "The price of rice is 203.5, beans is 400"}],
                        event_handler=AssistantEventHandler(),
                    ) as stream:
                        msg = await cl.Message(author=self.assistant_name, content="").send()
                        for text in stream.text_deltas:
                            print(text, end="", flush=True)
                            await msg.stream_token(text)
                        await msg.update()

You'd need to handle the function call, the above just provides dummy data to test.

EDIT: I don't think the above works very well, it seems for cases where we want a second iteration, eg code or SQL returned an error and we want the assistant to correct, it seems to return 'on_tool_call_done' after submitting tool results, and stops

@dividor
Copy link
Author

dividor commented Jun 16, 2024

In the end, I implemented a synchronous event handler because I just couldn't seem to get the async handling to work with streaming and tool outputs. I could cludge it using run_sync, but on submitting tool outputs, the process would complete.

I don't propose the below is a long-term solution at all, hopefully the maintainers can figure out the issue, but in case it's useful, this seems to work ...

Using chainlit utilities for running sync functions ...

from chainlit import make_async, run_sync

The (synchronous) handler ...

class EventHandler(AssistantEventHandler):

    def __init__(self, assistant_name: str) -> None:
        """
        Initializes a new instance of the ChatChainlitAssistant class.

        Args:
            assistant_name (str): The name of the assistant.

        Returns:
            None
        """
        super().__init__()
        self.current_message: cl.Message = None
        self.current_step: cl.Step = None
        self.current_tool_call = None
        self.current_message_text = ""
        self.assistant_name = assistant_name

    @override
    def on_event(self, event):
        """
        Handles the incoming event and performs the necessary actions based on the event type.

        Args:
            event: The event object containing information about the event.

        Returns:
            None
        """
        print(event.event)
        run_id = event.data.id
        if event.event == "thread.message.created":
            self.current_message = run_sync(cl.Message(content="").send())
            self.current_message_text = ""
            print("Run started")
        if event.event == "thread.message.completed":
            self.handle_message_completed(event.data, run_id)
        elif event.event == "thread.run.requires_action":
            self.handle_requires_action(event.data, run_id)
        elif event.event == "thread.message.delta":
            self.handle_message_delta(event.data)
        else:
            print(json.dumps(str(event.data), indent=4))
            print(f"Unhandled event: {event.event}")

    def handle_message_delta(self, data):
        """
        Handles the message delta data.

        Args:
            data: The message delta data.

        Returns:
            None
        """
        for content in data.delta.content:
            if content.type == "text":
                content = content.text.value
                self.current_message_text += content
                run_sync(self.current_message.stream_token(content))
            elif content.type == "image_file":
                file_id = content.image_file.file_id
                image_data = sync_openai_client.files.content(file_id)
                image_data_bytes = image_data.read()
                png_file = f"{images_loc}{file_id}.png"
                print(f"Writing image to {png_file}")
                with open(png_file, "wb") as file:
                    file.write(image_data_bytes)
                    image = cl.Image(path=png_file, display="inline", size="large")
                    print(f"Image: {png_file}")
                    if not self.current_message.elements:
                        self.current_message.elements = []
                        self.current_message.elements.append(image)
                        run_sync(self.current_message.update())
            else:
                print(f"Unhandled delta type: {content.type}")

    def handle_message_completed(self, data, run_id):
        """
        Handles the completion of a message.

        Args:
            data: The data associated with the completed message.
            run_id: The ID of the message run.

        Returns:
            None
        """
        # Add footer to self message. We have to start a new message so it's in right order
        # TODO combine streaming with image and footer
        run_sync(self.current_message.update())
        self.current_message = run_sync(cl.Message(content="").send())

        word_count = len(self.current_message_text.split())
        if word_count > 10:
            run_sync(self.current_message.stream_token(llm_footer))
        run_sync(self.current_message.update())

    def handle_requires_action(self, data, run_id):
        """
        Handles the required action by executing the specified tools and submitting the tool outputs.

        Args:
            data: The data containing the required action information.
            run_id: The ID of the current run.

        Returns:
            None
        """
        tool_outputs = []

        for tool in data.required_action.submit_tool_outputs.tool_calls:
            print(tool)

            function_name = tool.function.name
            function_args = tool.function.arguments

            function_output = run_function(function_name, function_args)

            tool_outputs.append({"tool_call_id": tool.id, "output": function_output})

        print("TOOL OUTPUTS: ")

        print(tool_outputs)

        # Submit all tool_outputs at the same time
        self.submit_tool_outputs(tool_outputs, run_id)

    def submit_tool_outputs(self, tool_outputs, run_id):
        """
        Submits the tool outputs to the current run.

        Args:
            tool_outputs (list): A list of tool outputs to be submitted.
            run_id (str): The ID of the current run.

        Returns:
            None
        """
        with sync_openai_client.beta.threads.runs.submit_tool_outputs_stream(
            thread_id=self.current_run.thread_id,
            run_id=self.current_run.id,
            tool_outputs=tool_outputs,
            event_handler=EventHandler(assistant_name=self.assistant_name),
        ) as stream:
            # Needs this line, or it doesn't work! :)
            for text in stream.text_deltas:
                print(text, end="", flush=True)


def run_function(function_name, function_args):
    """
    Run a function with the given name and arguments.

    Args:
        function_name (str): The name of the function to run.
        function_args (dict): The arguments to pass to the function.

    Returns:
        Any: The output of the function
    """
    if not hasattr(sys.modules[__name__], function_name):
        raise Exception(f"Function {function_name} not found")

    try:
        eval_str = f"{function_name}(**{function_args})"
        print(f"Running function: {eval_str}")
        output = eval(eval_str)

        if isinstance(output, bytes):
            output = output.decode("utf-8")
        print(output)

    except Exception as e:
        print(f"Error running function {function_name}: {e}")
        output = f"{e}"

    return output

This is then called with (note the sync client) ...

 with sync_openai_client.beta.threads.runs.stream(
        thread_id=thread_id,
        assistant_id=assistant.id,
        event_handler=EventHandler(assistant_name=assistant.name),
    ) as stream:
        stream.until_done()

NOT an elegant solution. :)

@tjroamer
Copy link

Unfortunately the suggested code didn't work for me. I changed the function submit_tool_outputs as follows, and it worked:

async def submit_tool_outputs(self, tool_outputs, run_id):
        """
        Submits the tool outputs to the current run.

        Args:
            tool_outputs (list): A list of tool outputs to be submitted.
            run_id (str): The ID of the current run.

        Returns:
            None
        """
        async with async_openai_client.beta.threads.runs.submit_tool_outputs_stream(
            thread_id=self.current_run.thread_id,
            run_id=self.current_run.id,
            tool_outputs=tool_outputs,
            event_handler=EventHandler(assistant_name=self.assistant_name),
        ) as stream:
            await stream.until_done()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants