-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
Copy pathfunction_call_content.py
201 lines (172 loc) · 8.17 KB
/
function_call_content.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# Copyright (c) Microsoft. All rights reserved.
import json
import logging
from typing import TYPE_CHECKING, Any, ClassVar, Final, Literal, TypeVar
from xml.etree.ElementTree import Element # nosec
from pydantic import Field
from typing_extensions import deprecated
from semantic_kernel.contents.const import FUNCTION_CALL_CONTENT_TAG, ContentTypes
from semantic_kernel.contents.kernel_content import KernelContent
from semantic_kernel.exceptions import (
ContentAdditionException,
ContentInitializationError,
FunctionCallInvalidArgumentsException,
FunctionCallInvalidNameException,
)
if TYPE_CHECKING:
from semantic_kernel.functions.kernel_arguments import KernelArguments
logger = logging.getLogger(__name__)
_T = TypeVar("_T", bound="FunctionCallContent")
EMPTY_VALUES: Final[list[str | None]] = ["", "{}", None]
class FunctionCallContent(KernelContent):
"""Class to hold a function call response."""
content_type: Literal[ContentTypes.FUNCTION_CALL_CONTENT] = Field(FUNCTION_CALL_CONTENT_TAG, init=False) # type: ignore
tag: ClassVar[str] = FUNCTION_CALL_CONTENT_TAG
id: str | None
index: int | None = None
name: str | None = None
function_name: str
plugin_name: str | None = None
arguments: str | dict[str, Any] | None = None
def __init__(
self,
content_type: Literal[ContentTypes.FUNCTION_CALL_CONTENT] = FUNCTION_CALL_CONTENT_TAG, # type: ignore
inner_content: Any | None = None,
ai_model_id: str | None = None,
id: str | None = None,
index: int | None = None,
name: str | None = None,
function_name: str | None = None,
plugin_name: str | None = None,
arguments: str | dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""Create function call content.
Args:
content_type: The content type.
inner_content (Any | None): The inner content.
ai_model_id (str | None): The id of the AI model.
id (str | None): The id of the function call.
index (int | None): The index of the function call.
name (str | None): The name of the function call.
When not supplied function_name and plugin_name should be supplied.
function_name (str | None): The function name.
Not used when 'name' is supplied.
plugin_name (str | None): The plugin name.
Not used when 'name' is supplied.
arguments (str | dict[str, Any] | None): The arguments of the function call.
metadata (dict[str, Any] | None): The metadata of the function call.
kwargs (Any): Additional arguments.
"""
if function_name and plugin_name and not name:
name = f"{plugin_name}-{function_name}"
if name and not function_name and not plugin_name:
if "-" in name:
plugin_name, function_name = name.split("-", maxsplit=1)
else:
function_name = name
args = {
"content_type": content_type,
"inner_content": inner_content,
"ai_model_id": ai_model_id,
"id": id,
"index": index,
"name": name,
"function_name": function_name or "",
"plugin_name": plugin_name,
"arguments": arguments,
}
if metadata:
args["metadata"] = metadata
super().__init__(**args)
def __str__(self) -> str:
"""Return the function call as a string."""
if isinstance(self.arguments, dict):
return f"{self.name}({json.dumps(self.arguments)})"
return f"{self.name}({self.arguments})"
def __add__(self, other: "FunctionCallContent | None") -> "FunctionCallContent":
"""Add two function calls together, combines the arguments, ignores the name.
When both function calls have a dict as arguments, the arguments are merged,
which means that the arguments of the second function call
will overwrite the arguments of the first function call if the same key is present.
When one of the two arguments are a dict and the other a string, we raise a ContentAdditionException.
"""
if not other:
return self
if self.id and other.id and self.id != other.id:
raise ContentAdditionException("Function calls have different ids.")
if self.index != other.index:
raise ContentAdditionException("Function calls have different indexes.")
return FunctionCallContent(
id=self.id or other.id,
index=self.index or other.index,
name=self.name or other.name,
arguments=self.combine_arguments(self.arguments, other.arguments),
)
def combine_arguments(
self, arg1: str | dict[str, Any] | None, arg2: str | dict[str, Any] | None
) -> str | dict[str, Any]:
"""Combine two arguments."""
if isinstance(arg1, dict) and isinstance(arg2, dict):
return {**arg1, **arg2}
# when one of the two is a dict, and the other isn't, we raise.
if isinstance(arg1, dict) or isinstance(arg2, dict):
raise ContentAdditionException("Cannot combine a dict with a string.")
if arg1 in EMPTY_VALUES and arg2 in EMPTY_VALUES:
return "{}"
if arg1 in EMPTY_VALUES:
return arg2 or "{}"
if arg2 in EMPTY_VALUES:
return arg1 or "{}"
return (arg1 or "") + (arg2 or "")
def parse_arguments(self) -> dict[str, Any] | None:
"""Parse the arguments into a dictionary."""
if not self.arguments:
return None
if isinstance(self.arguments, dict):
return self.arguments
try:
return json.loads(self.arguments)
except json.JSONDecodeError as exc:
raise FunctionCallInvalidArgumentsException("Function Call arguments are not valid JSON.") from exc
def to_kernel_arguments(self) -> "KernelArguments":
"""Return the arguments as a KernelArguments instance."""
from semantic_kernel.functions.kernel_arguments import KernelArguments
args = self.parse_arguments()
if not args:
return KernelArguments()
return KernelArguments(**args)
@deprecated("The function_name and plugin_name properties should be used instead.")
def split_name(self) -> list[str | None]:
"""Split the name into a plugin and function name."""
if not self.function_name:
raise FunctionCallInvalidNameException("Function name is not set.")
return [self.plugin_name or "", self.function_name]
@deprecated("The function_name and plugin_name properties should be used instead.")
def split_name_dict(self) -> dict:
"""Split the name into a plugin and function name."""
return {"plugin_name": self.plugin_name, "function_name": self.function_name}
def to_element(self) -> Element:
"""Convert the function call to an Element."""
element = Element(self.tag)
if self.id:
element.set("id", self.id)
if self.name:
element.set("name", self.name)
if self.arguments:
element.text = json.dumps(self.arguments) if isinstance(self.arguments, dict) else self.arguments
return element
@classmethod
def from_element(cls: type[_T], element: Element) -> _T:
"""Create an instance from an Element."""
if element.tag != cls.tag:
raise ContentInitializationError(f"Element tag is not {cls.tag}") # pragma: no cover
return cls(name=element.get("name"), id=element.get("id"), arguments=element.text or "")
def to_dict(self) -> dict[str, str | Any]:
"""Convert the instance to a dictionary."""
args = json.dumps(self.arguments) if isinstance(self.arguments, dict) else self.arguments
return {"id": self.id, "type": "function", "function": {"name": self.name, "arguments": args}}
def __hash__(self) -> int:
"""Return the hash of the function call content."""
return hash((self.tag, self.id, self.index, self.name, self.function_name, self.plugin_name, self.arguments))