Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -891,16 +891,14 @@ async def handle_documents(
if memory_tool and code_interpreter_tool:
# if both memory and code_interpreter are available, we download the URLs
# and attach the data to the last message.
msg = await attachment_message(self.tempdir, url_items)
input_messages.append(msg)
await attachment_message(self.tempdir, url_items, input_messages[-1])
# Since memory is present, add all the data to the memory bank
await self.add_to_session_vector_db(session_id, documents)
elif code_interpreter_tool:
# if only code_interpreter is available, we download the URLs to a tempdir
# and attach the path to them as a message to inference with the
# assumption that the model invokes the code_interpreter tool with the path
msg = await attachment_message(self.tempdir, url_items)
input_messages.append(msg)
await attachment_message(self.tempdir, url_items, input_messages[-1])
elif memory_tool:
# if only memory is available, we load the data from the URLs and content items to the memory bank
await self.add_to_session_vector_db(session_id, documents)
Expand Down Expand Up @@ -967,8 +965,8 @@ async def load_data_from_urls(urls: List[URL]) -> List[str]:
return data


async def attachment_message(tempdir: str, urls: List[URL]) -> ToolResponseMessage:
content = []
async def attachment_message(tempdir: str, urls: List[URL], message: UserMessage) -> None:
contents = []

for url in urls:
uri = url.uri
Expand All @@ -988,16 +986,19 @@ async def attachment_message(tempdir: str, urls: List[URL]) -> ToolResponseMessa
else:
raise ValueError(f"Unsupported URL {url}")

content.append(
contents.append(
TextContentItem(
text=f'# User provided a file accessible to you at "{filepath}"\nYou can use code_interpreter to load and inspect it.'
)
)

return ToolResponseMessage(
call_id="",
content=content,
)
if isinstance(message.content, list):
message.content.extend(contents)
else:
if isinstance(message.content, str):
message.content = [TextContentItem(text=message.content)] + contents
else:
message.content = [message.content] + contents

Comment on lines +995 to 1002
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Append to user message instead since litellm requires tool responses message to follow a message with tool_call, which does make sense imo.


def _interpret_content_as_attachment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ async def _get_params(self, request: ChatCompletionRequest) -> dict:
if request.tools:
input_dict["tools"] = [convert_tooldef_to_openai_tool(tool) for tool in request.tools]
if request.tool_config.tool_choice:
input_dict["tool_choice"] = request.tool_config.tool_choice.value
input_dict["tool_choice"] = (
request.tool_config.tool_choice.value
if isinstance(request.tool_config.tool_choice, ToolChoice)
else request.tool_config.tool_choice
)

provider_data = self.get_request_provider_data()
key_field = self.provider_data_api_key_field
Expand Down
188 changes: 103 additions & 85 deletions llama_stack/providers/utils/inference/openai_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,26 +527,30 @@ async def convert_message_to_openai_dict_new(
async def _convert_message_content(
content: InterleavedContent,
) -> Union[str, Iterable[OpenAIChatCompletionContentPartParam]]:
async def impl():
async def impl(
content_: InterleavedContent,
) -> Union[str, OpenAIChatCompletionContentPartParam, List[OpenAIChatCompletionContentPartParam]]:
# Llama Stack and OpenAI spec match for str and text input
if isinstance(content, str):
return content
elif isinstance(content, TextContentItem):
if isinstance(content_, str):
return content_
elif isinstance(content_, TextContentItem):
return OpenAIChatCompletionContentPartTextParam(
type="text",
text=content.text,
text=content_.text,
)
elif isinstance(content, ImageContentItem):
elif isinstance(content_, ImageContentItem):
return OpenAIChatCompletionContentPartImageParam(
type="image_url",
image_url=OpenAIImageURL(url=await convert_image_content_to_url(content)),
image_url=OpenAIImageURL(url=await convert_image_content_to_url(content_)),
)
elif isinstance(content, list):
return [await _convert_message_content(item) for item in content]
elif isinstance(content_, list):
return [await impl(item) for item in content_]
else:
raise ValueError(f"Unsupported content type: {type(content)}")
raise ValueError(f"Unsupported content type: {type(content_)}")

ret = await impl()
ret = await impl(content)

# OpenAI*Message expects a str or list
if isinstance(ret, str) or isinstance(ret, list):
return ret
else:
Expand All @@ -566,13 +570,14 @@ async def impl():
OpenAIChatCompletionMessageToolCall(
id=tool.call_id,
function=OpenAIFunction(
name=tool.tool_name,
name=tool.tool_name if not isinstance(tool.tool_name, BuiltinTool) else tool.tool_name.value,
arguments=json.dumps(tool.arguments),
),
type="function",
)
for tool in message.tool_calls
],
]
or None,
)
elif isinstance(message, ToolResponseMessage):
out = OpenAIChatCompletionToolMessage(
Expand Down Expand Up @@ -858,7 +863,8 @@ async def convert_openai_chat_completion_stream(
event_type = ChatCompletionResponseEventType.progress

stop_reason = None
toolcall_buffer = {}
tool_call_idx_to_buffer = {}

async for chunk in stream:
choice = chunk.choices[0] # assuming only one choice per chunk

Expand All @@ -868,7 +874,6 @@ async def convert_openai_chat_completion_stream(

# if there's a tool call, emit an event for each tool in the list
# if tool call and content, emit both separately

if choice.delta.tool_calls:
# the call may have content and a tool call. ChatCompletionResponseEvent
# does not support both, so we emit the content first
Expand All @@ -889,44 +894,53 @@ async def convert_openai_chat_completion_stream(
)

if not enable_incremental_tool_calls:
yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=next(event_type),
delta=ToolCallDelta(
tool_call=_convert_openai_tool_calls(choice.delta.tool_calls)[0],
parse_status=ToolCallParseStatus.succeeded,
),
logprobs=_convert_openai_logprobs(logprobs),
for tool_call in choice.delta.tool_calls:
yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=event_type,
delta=ToolCallDelta(
tool_call=_convert_openai_tool_calls([tool_call])[0],
parse_status=ToolCallParseStatus.succeeded,
),
logprobs=_convert_openai_logprobs(logprobs),
)
)
)
else:
tool_call = choice.delta.tool_calls[0]
if "name" not in toolcall_buffer:
toolcall_buffer["call_id"] = tool_call.id
toolcall_buffer["name"] = None
toolcall_buffer["content"] = ""
if "arguments" not in toolcall_buffer:
toolcall_buffer["arguments"] = ""

if tool_call.function.name:
toolcall_buffer["name"] = tool_call.function.name
delta = f"{toolcall_buffer['name']}("
if tool_call.function.arguments:
toolcall_buffer["arguments"] += tool_call.function.arguments
delta = toolcall_buffer["arguments"]

toolcall_buffer["content"] += delta
yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=event_type,
delta=ToolCallDelta(
tool_call=delta,
parse_status=ToolCallParseStatus.in_progress,
),
logprobs=_convert_openai_logprobs(logprobs),
)
)
else:
for tool_call in choice.delta.tool_calls:
idx = tool_call.index if hasattr(tool_call, "index") else 0

if idx not in tool_call_idx_to_buffer:
tool_call_idx_to_buffer[idx] = {
"call_id": tool_call.id,
"name": None,
"arguments": "",
"content": "",
}

buffer = tool_call_idx_to_buffer[idx]

if tool_call.function:
if tool_call.function.name:
buffer["name"] = tool_call.function.name
delta = f"{buffer['name']}("
buffer["content"] += delta

if tool_call.function.arguments:
delta = tool_call.function.arguments
buffer["arguments"] += delta
buffer["content"] += delta

yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=event_type,
delta=ToolCallDelta(
tool_call=delta,
parse_status=ToolCallParseStatus.in_progress,
),
logprobs=_convert_openai_logprobs(logprobs),
)
)
elif choice.delta.content:
yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=event_type,
Expand All @@ -935,47 +949,51 @@ async def convert_openai_chat_completion_stream(
)
)

if toolcall_buffer:
delta = ")"
toolcall_buffer["content"] += delta
yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=event_type,
delta=ToolCallDelta(
tool_call=delta,
parse_status=ToolCallParseStatus.in_progress,
),
logprobs=_convert_openai_logprobs(logprobs),
)
)
try:
arguments = json.loads(toolcall_buffer["arguments"])
tool_call = ToolCall(
call_id=toolcall_buffer["call_id"],
tool_name=toolcall_buffer["name"],
arguments=arguments,
)
for idx, buffer in tool_call_idx_to_buffer.items():
logger.debug(f"toolcall_buffer[{idx}]: {buffer}")
if buffer["name"]:
delta = ")"
buffer["content"] += delta
yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=ChatCompletionResponseEventType.progress,
event_type=event_type,
delta=ToolCallDelta(
tool_call=tool_call,
parse_status=ToolCallParseStatus.succeeded,
tool_call=delta,
parse_status=ToolCallParseStatus.in_progress,
),
stop_reason=stop_reason,
logprobs=None,
)
)
except json.JSONDecodeError:
yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=ChatCompletionResponseEventType.complete,
delta=ToolCallDelta(
tool_call=toolcall_buffer["content"],
parse_status=ToolCallParseStatus.failed,
),
stop_reason=stop_reason,

try:
arguments = json.loads(buffer["arguments"])
tool_call = ToolCall(
call_id=buffer["call_id"],
tool_name=buffer["name"],
arguments=arguments,
)
yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=ChatCompletionResponseEventType.progress,
delta=ToolCallDelta(
tool_call=tool_call,
parse_status=ToolCallParseStatus.succeeded,
),
stop_reason=stop_reason,
)
)
except json.JSONDecodeError as e:
print(f"Failed to parse arguments: {e}")
yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
event_type=ChatCompletionResponseEventType.progress,
delta=ToolCallDelta(
tool_call=buffer["content"],
parse_status=ToolCallParseStatus.failed,
),
stop_reason=stop_reason,
)
)
)

yield ChatCompletionResponseStreamChunk(
event=ChatCompletionResponseEvent(
Expand Down
Loading