Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create cookbook for LlamaIndex Workflow abstraction #138

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

tituslhy
Copy link

This cookbook aims to provide an example of how to use LlamaIndex's latest Workflow abstraction with Chainlit.

@JerePlum99
Copy link

@tituslhy This is great - I was looking into doing this myself and came across your implementation.

Have you experimented at all with adding nesting steps with the parent/child structure or updating input/output for each step? I've figured out how to update input/output but have struggled to properly nest the parent/child steps properly between runs.

@tituslhy
Copy link
Author

Do you mean running a nested workflow or running a nested workflow with chain of thought in chainlit?

I've had success with the first but haven't tried the second!

@JerePlum99
Copy link

@tituslhy a nested chain of thought within Chainlit using Llamaindex workflows.

Similar to the default langchain callback implementation

@tituslhy
Copy link
Author

tituslhy commented Oct 23, 2024

Ah I'm unable to get it too. Like the nested workflow works but cannot display the steps taken by the internal workflow in the chain of thought. I guess that's ok - don't want to crowd the frontend with too many progress bars too esp if your workflows and nested workflows have many steps.

# Definitions and imports
class SearchEvent(Event):
    """Requires the LLM to do an online search to answer the question"""
    query: Annotated[str, "The user's query"]

class AnswerEvent(Event):
    """Allows the LLM to answer the question without searching"""
    query: Annotated[str, "The user's query"]

class ResponseEvent(Event):
    """Collects LLM response"""
    query: Annotated[str, "The user's query"]
    answer: Annotated[str, "The LLM's response"]

### Define workfow
class HashTagWorkflow(Workflow):
    def __init__(self, *args: Any, **kwargs: Any):
        self.llm = llm
        super().__init__(*args, **kwargs)
    
    @cl.step(type="llm")
    @step
    async def generate_hashtag(self, ev: StartEvent) -> StopEvent:
        response = await self.llm.acomplete(f"Generate 1-3 hashtags related to {ev.response}")
        return StopEvent(str(response))

class MixtureOfAnswers(Workflow):
    def __init__(
        self, 
        *args: Any,
        llm: Optional[LLM] = llm,
        **kwargs: Any
    ):
        """Class constructor. Takes in an llm instance and constructs 
        1. A function calling agent with search tools
        2. A simple chat engine instance
        3. A common memory instance across the workflow

        Args:
            llm (Optional[LLM], optional): LLM instance. Defaults to Settings.llm.
        """
        super().__init__(*args, **kwargs)
        self.llm = llm
        self.search_agent=ReActAgent.from_tools(
            tools = search_tools,
            llm = self.llm
        )
        # self.search_agent = self.search_agent_worker.as_agent()
        self.answer_without_search_engine = SimpleChatEngine.from_defaults(
            llm = self.llm
        )
        self.history: List[ChatMessage] = []
    
    @cl.step(type="llm")
    @step()
    async def route_to_llm(
        self,
        ev: StartEvent
    ) -> SearchEvent | AnswerEvent:
        """Generates a search event and an answer event once given a start event"""
        
        ## Update memory
        self.history.append(
            ChatMessage(
                role = MessageRole.USER,
                content = ev.query
            )
        )
        
        ## Routes to both events. But you can also write a router component to decide 
        ## which event to route to.
        self.send_event(SearchEvent(query = ev.query))
        self.send_event(AnswerEvent(query = ev.query))
    
    @cl.step(type="tool")
    @step()
    async def search_and_answer(
        self,
        ev: SearchEvent
    ) -> ResponseEvent:
        """Uses the tavily search tool to answer the question"""
        
        ## Synthesize response
        response = await self.search_agent.achat(
            ev.query, 
            chat_history = self.history
        )
        
        ## [OPTIONAL] Show intermediate response in the frontend
        # await cl.Message(content="ANSWER WITH SEARCH: " + str(response)).send()
        
        ## Update memory
        self.history.append(
            ChatMessage(
                role = MessageRole.ASSISTANT,
                content = "ANSWER WITH SEARCH: " + str(response)
            )
        )
        
        return ResponseEvent(query = ev.query, answer = str(response))

    @cl.step(type="llm")
    @step()
    async def simply_answer(
        self,
        ev: AnswerEvent
    ) -> ResponseEvent:
        """Uses the LLM to simple answer the question"""
        
        ## Synthesize response
        response = await self.answer_without_search_engine.achat(
            ev.query, 
            chat_history = self.history
        )
        
        ## [OPTIONAL] Show intermediate response in the frontend
        # await cl.Message(content="ANSWER WITHOUT SEARCH: " + str(response)).send()
        
        ## Update memory
        self.history.append(
            ChatMessage(
                role = MessageRole.ASSISTANT,
                content = "ANSWER WITHOUT SEARCH: " + str(response)
            )
        )
        
        return ResponseEvent(query = ev.query, answer = str(response))
    
    @cl.step(type="llm")
    @step()
    async def compile(
        self,
        ctx: Context,
        ev: ResponseEvent,
        hashtag_workflow: Workflow = HashTagWorkflow()
    ) -> StopEvent:
        """Compiles and summarizes answers from all response events"""
        
        ## There are 2 response events from routing to 2 different agents. This can
        ## also be a dynamic number of events.
        ready = ctx.collect_events(ev, [ResponseEvent] * 2) 
        
        if ready is None:
            return None
        
        response = await self.llm.acomplete(
            f"""
            A user has asked us a question and we have responded accordingly using a 
            search tool and without using a search tool. Your job is to decide which 
            response best answered the question and summarize the response into a crisp 
            reply. If both responses answered the question, summarize both responses
            into a single answer.
            
            The user's query was: {ev.query}
            
            The responses are:
            {ready[0].answer} &
            {ready[1].answer}
            """
        )
        
        ## Add hashtag
        hashtag = await hashtag_workflow.run(response=str(response))
        
        ## Update memory
        self.history.append(
            ChatMessage(
                role = MessageRole.ASSISTANT,
                content = "FINAL ANSWER: " + str(response) + str(hashtag)
            )
        )
        
        return StopEvent(result = str(response) + str(hashtag))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants