Skip to content

Commit

Permalink
Get conversation autotitling to use any AI backend (#500)
Browse files Browse the repository at this point in the history
  • Loading branch information
krschacht authored Aug 27, 2024
1 parent 1fba37e commit a15d47c
Show file tree
Hide file tree
Showing 25 changed files with 609 additions and 468 deletions.
21 changes: 13 additions & 8 deletions app/jobs/autotitle_conversation_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,28 @@ class ConversationNotReady < StandardError; end
queue_as :default

def perform(conversation_id)
conversation = Conversation.find(conversation_id)
return false if conversation.assistant.api_service.effective_token.blank? # should we use anthropic key if that's all the user has?
@conversation = Conversation.find(conversation_id)
return false if @conversation.assistant.api_service.effective_token.blank? # should we use anthropic key if that's all the user has?

messages = conversation.messages.ordered.limit(4)
messages = @conversation.messages.ordered.limit(4)
raise ConversationNotReady if messages.empty?

new_title = Current.set(user: conversation.user) do
new_title = Current.set(user: @conversation.user) do
generate_title_for(messages.map(&:content_text).join("\n"))
end
conversation.update!(title: new_title)
@conversation.update!(title: new_title)
end

private

def generate_title_for(text)
json_response = ChatCompletionAPI.get_next_response(system_message, [text], response_format: {type: "json_object"})
json_response["topic"]
ai_backend = @conversation.assistant.api_service.ai_backend.new(@conversation.user, @conversation.assistant)
response = ai_backend.get_oneoff_message(
system_message,
[text],
# response_format: { type: "json_object" }) this causes problems for Groq even though it's supported: https://console.groq.com/docs/api-reference#chat-create
)
JSON.parse(response).dig("topic")
end

def system_message
Expand All @@ -43,7 +48,7 @@ def system_message
Your reply (always do JSON):
```
{ topic: "Rails collection counter" }
{ "topic": "Rails collection counter" }
```
END
end
Expand Down
4 changes: 2 additions & 2 deletions app/jobs/get_next_ai_message_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def perform(user_id, message_id, assistant_id, attempt = 1)

response = Current.set(user: @user, message: @message) do
ai_backend.new(@conversation.user, @assistant, @conversation, @message)
.get_next_chat_message do |content_chunk|
.stream_next_conversation_message do |content_chunk|
@message.content_text += content_chunk

if Time.current.to_f - last_sent_at.to_f >= 0.1
Expand All @@ -47,7 +47,7 @@ def perform(user_id, message_id, assistant_id, attempt = 1)
end
end
end
@message.content_tool_calls = response # Typically, get_next_chat_message will simply return nil because it executes
@message.content_tool_calls = response # Typically, stream_next_conversation_message will simply return nil because it executes
# the content_chunk block to return it's response incrementally. However, tool_call
# responses don't make sense to stream because they can't be executed incrementally
# so we just return the full tool response message at once. The only time we return
Expand Down
5 changes: 5 additions & 0 deletions app/models/language_model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def created_by_current_user?
user == Current.user
end

def supports_tools?
attributes["supports_tools"] &&
api_service.name != "Groq" # TODO: Remove this short circuit once I can debug tool use with Groq
end

private

def populate_position
Expand Down
133 changes: 70 additions & 63 deletions app/services/ai_backend.rb
Original file line number Diff line number Diff line change
@@ -1,50 +1,91 @@
class AIBackend
include Utilities, Tools

attr :client

def initialize(user, assistant, conversation, message)
def initialize(user, assistant, conversation = nil, message = nil)
@user = user
@assistant = assistant
@conversation = conversation
@message = message
@message = message # required for streaming responses
@client_config = {}
@response_handler = nil
end

def get_oneoff_message(instructions, messages, params = {})
set_client_config(
instructions: instructions,
messages: preceding_messages(messages),
params: params,
)
response = @client.send(client_method_name, ** @client_config)

response.dig("content", 0, "text") ||
response.dig("choices", 0, "message", "content")
end

def stream_next_conversation_message(&chunk_handler)
@stream_response_text = ""
@stream_response_tool_calls = []
@response_handler = block_given? ? stream_handler(&chunk_handler) : nil

set_client_config(
instructions: full_instructions,
messages: preceding_conversation_messages,
streaming: true,
)

begin
response = @client.send(client_method_name, ** @client_config)
rescue ::Faraday::UnauthorizedError => e
raise configuration_error
end

if @stream_response_tool_calls.present?
return format_parallel_tool_calls(@stream_response_tool_calls)
elsif @stream_response_text.blank?
raise ::Faraday::ParsingError
end
end

private

def client_method_name
raise NotImplementedError
end

def self.get_tool_messages_by_calling(tool_calls_response)
tool_calls = deep_json_parse(tool_calls_response)
def configuration_error
raise NotImplementedError
end

# We could parallelize function calling using ruby threads
tool_calls.map do |tool_call|
id = tool_call.dig(:id)
function_name = tool_call.dig(:function, :name)
function_arguments = tool_call.dig(:function, :arguments)
def set_client_config(config)
if config[:streaming] && @response_handler.nil?
raise "You configured streaming: true but did not define @response_handler"
end
end

raise "Unexpected tool call: #{id}, #{function_name}, and #{function_arguments}" if function_name.blank? || function_arguments.nil?
def get_response
raise NotImplementedError
end

function_response = begin
Toolbox.call(function_name, function_arguments)
rescue => e
puts "## Handled error calling tools: #{e.message}" unless Rails.env.test?
puts e.backtrace.join("\n") unless Rails.env.test?
def stream_response
raise NotImplementedError
end

<<~STR.gsub("\n", " ")
An unexpected error occurred (#{e.message}). You were querying information to help you answer a users question. Because this information
is not available at this time, DO NOT MAKE ANY GUESSES as you attempt to answer the users questions. Instead, consider attempting a
different query OR let the user know you attempted to retrieve some information but the website is having difficulties at this time.
STR
end
def preceding_messages(messages = [])
messages.map.with_index do |msg, i|
role = (i % 2).zero? ? "user" : "assistant"

{
role: "tool",
content: function_response.to_json,
tool_call_id: id,
role: role,
content: msg
}
end
rescue => e
puts "## UNHANDLED error calling tools: #{e.message}"
puts e.backtrace.join("\n")
raise ::Faraday::ParsingError
end

private
def preceding_conversation_messages
raise NotImplementedError
end

def full_instructions
s = @assistant.instructions.to_s
Expand All @@ -57,38 +98,4 @@ def full_instructions
s += "\n\nFor the user, the current time is #{DateTime.current.strftime("%-l:%M%P")}; the current date is #{DateTime.current.strftime("%A, %B %-d, %Y")}"
s.strip
end

def deep_streaming_merge(hash1, hash2)
merged_hash = hash1.dup
hash2.each do |key, value|
if merged_hash.has_key?(key) && merged_hash[key].is_a?(Hash) && value.is_a?(Hash)
merged_hash[key] = deep_streaming_merge(merged_hash[key], value)
elsif merged_hash.has_key?(key)
merged_hash[key] += value
else
merged_hash[key] = value
end
end
merged_hash
end

def self.deep_json_parse(obj)
if obj.is_a?(Array)
obj.map { |item| deep_json_parse(item) }
else
converted_hash = {}
obj.each do |key, value|
if value.is_a?(Hash)
converted_hash[key] = deep_json_parse(value)
else
converted_hash[key] = begin
JSON.parse(value)
rescue => e
value
end
end
end
converted_hash
end
end
end
67 changes: 30 additions & 37 deletions app/services/ai_backend/anthropic.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class AIBackend::Anthropic < AIBackend
include Tools

# Rails system tests don't seem to allow mocking because the server and the
# test are in separate processes.
#
Expand All @@ -12,7 +14,7 @@ def self.client
end
end

def initialize(user, assistant, conversation, message)
def initialize(user, assistant, conversation = nil, message = nil)
super(user, assistant, conversation, message)
begin
raise ::Anthropic::ConfigurationError if assistant.api_service.requires_token? && assistant.api_service.effective_token.blank?
Expand All @@ -23,10 +25,32 @@ def initialize(user, assistant, conversation, message)
end
end

def get_next_chat_message(&chunk_received_handler)
stream_response_text = ""
private

def client_method_name
:messages
end

def configuration_error
::Anthropic::ConfigurationError
end

def set_client_config(config)
super(config)

@client_config = {
model: @assistant.language_model.provider_name,
system: config[:instructions],
messages: config[:messages],
parameters: {
max_tokens: 2000, # we should really set this dynamically, based on the model, to the max
stream: config[:streaming] && @response_handler || nil,
}.compact.merge(config[:params]&.except(:response_format) || {})
}.compact
end

response_handler = proc do |intermediate_response, bytesize|
def stream_handler(&chunk_handler)
proc do |intermediate_response, bytesize|
chunk = intermediate_response.dig("delta", "text")

if (input_tokens = intermediate_response.dig("message", "usage", "input_tokens"))
Expand All @@ -39,7 +63,7 @@ def get_next_chat_message(&chunk_received_handler)

print chunk if Rails.env.development?
if chunk
stream_response_text += chunk
@stream_response_text += chunk
yield chunk
end
rescue ::GetNextAIMessageJob::ResponseCancelled => e
Expand All @@ -50,40 +74,9 @@ def get_next_chat_message(&chunk_received_handler)
puts "\nUnhandled error in AIBackend::Anthropic response handler: #{e.message}"
puts e.backtrace
end

response_handler = nil unless block_given?
response = nil

begin
response = @client.messages(
model: @assistant.language_model.provider_name,
system: full_instructions,
messages: preceding_messages,
parameters: {
max_tokens: 2000, # we should really set this dynamically, based on the model, to the max
stream: response_handler,
}
)
rescue ::Faraday::UnauthorizedError => e
raise ::Anthropic::ConfigurationError
end

response_text = if response.is_a?(Hash) && response.dig("content")
response.dig("content", 0, "text")
else
response
end

if response_text.blank? && stream_response_text.blank?
raise ::Faraday::ParsingError
else
response_text
end
end

private

def preceding_messages
def preceding_conversation_messages
@conversation.messages.for_conversation_version(@message.version).where("messages.index < ?", @message.index).collect do |message|
if @assistant.supports_images? && message.documents.present?

Expand Down
11 changes: 11 additions & 0 deletions app/services/ai_backend/anthropic/tools.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module AIBackend::Anthropic::Tools
extend ActiveSupport::Concern

included do
private

def format_parallel_tool_calls(content_tool_calls)
[]
end
end
end
Loading

0 comments on commit a15d47c

Please sign in to comment.