Skip to content

Commit

Permalink
Add streaming support (#760)
Browse files Browse the repository at this point in the history
* Add streaming support

Passing a block to the chat method enables streaming.

* Add usage token support for streaming

---------

Co-authored-by: Andrei Bondarev <andrei@sourcelabs.io>
  • Loading branch information
chalmagean and andreibondarev authored Sep 20, 2024
1 parent b9f8a65 commit 7a095c2
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 12 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,19 @@ assistant.add_message_and_run!(content: "What's the latest news about AI?")
messages = assistant.messages

# Run the assistant with automatic tool execution
assistant.run!
assistant.run(auto_tool_execution: true)

# If you want to stream the response, you can add a response handler
assistant = Langchain::Assistant.new(
llm: llm,
instructions: "You're a helpful AI assistant",
tools: [Langchain::Tool::NewsRetriever.new(api_key: ENV["NEWS_API_KEY"])]
) do |response_chunk|
# ...handle the response stream
# print(response_chunk.inspect)
end
assistant.add_message(content: "Hello")
assistant.run(auto_tool_execution: true)
```

### Configuration
Expand Down
6 changes: 4 additions & 2 deletions lib/langchain/assistants/assistant.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def initialize(
instructions: nil,
tool_choice: "auto",
messages: [],
add_message_callback: nil
add_message_callback: nil,
&block
)
unless tools.is_a?(Array) && tools.all? { |tool| tool.class.singleton_class.included_modules.include?(Langchain::ToolDefinition) }
raise ArgumentError, "Tools must be an array of objects extending Langchain::ToolDefinition"
Expand All @@ -48,6 +49,7 @@ def initialize(
@tools = tools
self.tool_choice = tool_choice
@instructions = instructions
@block = block
@state = :ready

@total_prompt_tokens = 0
Expand Down Expand Up @@ -361,7 +363,7 @@ def chat_with_llm
tools: @tools,
tool_choice: tool_choice
)
@llm.chat(**params)
@llm.chat(**params, &@block)
end

# Run the tools automatically
Expand Down
10 changes: 6 additions & 4 deletions lib/langchain/llm/openai.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ def chat(params = {}, &block)
raise ArgumentError.new("'tool_choice' is only allowed when 'tools' are specified.")
end

# TODO: Clean this part up
if block
@response_chunks = []
parameters[:stream_options] = {include_usage: true}
parameters[:stream] = proc do |chunk, _bytesize|
chunk_content = chunk.dig("choices", 0)
chunk_content = chunk.dig("choices", 0) || {}
@response_chunks << chunk
yield chunk_content
end
Expand Down Expand Up @@ -177,7 +177,9 @@ def with_api_error_handling
end

def response_from_chunks
grouped_chunks = @response_chunks.group_by { |chunk| chunk.dig("choices", 0, "index") }
grouped_chunks = @response_chunks
.group_by { |chunk| chunk.dig("choices", 0, "index") }
.except(nil) # the last chunk (that contains the token usage) has no index
final_choices = grouped_chunks.map do |index, chunks|
{
"index" => index,
Expand All @@ -189,7 +191,7 @@ def response_from_chunks
"finish_reason" => chunks.last.dig("choices", 0, "finish_reason")
}
end
@response_chunks.first&.slice("id", "object", "created", "model")&.merge({"choices" => final_choices})
@response_chunks.first&.slice("id", "object", "created", "model")&.merge({"choices" => final_choices, "usage" => @response_chunks.last["usage"]})
end

def tool_calls_from_choice_chunks(choice_chunks)
Expand Down
23 changes: 23 additions & 0 deletions spec/langchain/assistants/assistant_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,29 @@
# Replaces the previous system message
expect(assistant.messages.first.content).to eq("You are an expert assistant")
end

context "when a block is provided" do
it "passes the block to the chat call" do
response = double(
"Response",
tool_calls: [],
role: "assistant",
chat_completion: :completed,
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0,
completion: nil
)
callback = double("Callback")
assistant = described_class.new(llm: llm) { callback }
assistant.add_message(content: "foo")

expect(llm).to receive(:chat).with(any_args) do |&block|
expect(block.call).to eq(callback)
end.and_return(response)
assistant.run
end
end
end

describe "#add_message" do
Expand Down
23 changes: 18 additions & 5 deletions spec/langchain/llm/openai_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -542,29 +542,34 @@
end

context "with streaming" do
let(:streamed_content) { [] }
let(:streamed_response_chunk) do
{
"id" => "chatcmpl-7Hcl1sXOtsaUBKJGGhNujEIwhauaD",
"choices" => [{"index" => 0, "delta" => {"content" => answer}, "finish_reason" => nil}]
}
end
let(:token_usage) do
{
"usage" => {"prompt_tokens" => 10, "completion_tokens" => 11, "total_tokens" => 12}
}
end

it "handles streaming responses correctly" do
allow(subject.client).to receive(:chat) do |parameters|
parameters[:parameters][:stream].call(streamed_response_chunk)
streamed_response_chunk
parameters[:parameters][:stream].call(token_usage)
end
response = subject.chat(messages: [content: prompt, role: "user"]) do |chunk|
chunk
end
expect(response).to be_a(Langchain::LLM::OpenAIResponse)
expect(response.chat_completion).to eq(answer)
expect(response.prompt_tokens).to eq(10)
expect(response.completion_tokens).to eq(11)
expect(response.total_tokens).to eq(12)
end
end

context "with streaming and multiple choices n=2" do
let(:streamed_content) { [] }
let(:answer) { "Hello how are you?" }
let(:answer_2) { "Alternative answer" }
let(:streamed_response_chunk) do
Expand All @@ -579,12 +584,17 @@
"choices" => [{"index" => 1, "delta" => {"content" => answer_2}, "finish_reason" => "stop"}]
}
end
let(:token_usage) do
{
"usage" => {"prompt_tokens" => 10, "completion_tokens" => 11, "total_tokens" => 12}
}
end

it "handles streaming responses correctly" do
allow(subject.client).to receive(:chat) do |parameters|
parameters[:parameters][:stream].call(streamed_response_chunk)
parameters[:parameters][:stream].call(streamed_response_chunk_2)
streamed_response_chunk
parameters[:parameters][:stream].call(token_usage)
end
response = subject.chat(messages: [content: prompt, role: "user"], n: 2) do |chunk|
chunk
Expand All @@ -596,6 +606,9 @@
{"index" => 1, "message" => {"role" => "assistant", "content" => answer_2}, "finish_reason" => "stop"}
]
)
expect(response.prompt_tokens).to eq(10)
expect(response.completion_tokens).to eq(11)
expect(response.total_tokens).to eq(12)
end
end

Expand Down

0 comments on commit 7a095c2

Please sign in to comment.