-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
Copy pathopenapi_runner.py
184 lines (156 loc) · 7.75 KB
/
openapi_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Copyright (c) Microsoft. All rights reserved.
import json
import logging
from collections import OrderedDict
from collections.abc import Awaitable, Callable, Mapping
from inspect import isawaitable
from typing import Any
from urllib.parse import urlparse, urlunparse
import httpx
from openapi_core import Spec
from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation import RestApiOperation
from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_expected_response import (
RestApiOperationExpectedResponse,
)
from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_payload import RestApiOperationPayload
from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_run_options import RestApiOperationRunOptions
from semantic_kernel.exceptions.function_exceptions import FunctionExecutionException
from semantic_kernel.functions.kernel_arguments import KernelArguments
from semantic_kernel.utils.experimental_decorator import experimental_class
from semantic_kernel.utils.telemetry.user_agent import APP_INFO, prepend_semantic_kernel_to_user_agent
logger: logging.Logger = logging.getLogger(__name__)
@experimental_class
class OpenApiRunner:
"""The OpenApiRunner that runs the operations defined in the OpenAPI manifest."""
payload_argument_name = "payload"
media_type_application_json = "application/json"
def __init__(
self,
parsed_openapi_document: Mapping[str, str],
auth_callback: Callable[..., dict[str, str] | Awaitable[dict[str, str]]] | None = None,
http_client: httpx.AsyncClient | None = None,
enable_dynamic_payload: bool = True,
enable_payload_namespacing: bool = False,
):
"""Initialize the OpenApiRunner."""
self.spec = Spec.from_dict(parsed_openapi_document) # type: ignore
self.auth_callback = auth_callback
self.http_client = http_client
self.enable_dynamic_payload = enable_dynamic_payload
self.enable_payload_namespacing = enable_payload_namespacing
def build_full_url(self, base_url, query_string):
"""Build the full URL."""
url_parts = list(urlparse(base_url))
url_parts[4] = query_string
return urlunparse(url_parts)
def build_operation_url(
self, operation: RestApiOperation, arguments: KernelArguments, server_url_override=None, api_host_url=None
):
"""Build the operation URL."""
url = operation.build_operation_url(arguments, server_url_override, api_host_url)
return self.build_full_url(url, operation.build_query_string(arguments))
def build_json_payload(
self, payload_metadata: RestApiOperationPayload, arguments: dict[str, Any]
) -> tuple[str, str]:
"""Build the JSON payload."""
if self.enable_dynamic_payload:
if payload_metadata is None:
raise FunctionExecutionException(
"Payload can't be built dynamically due to the missing payload metadata."
)
payload = self.build_json_object(payload_metadata.properties, arguments)
content = json.dumps(payload)
return content, payload_metadata.media_type
argument = arguments.get(self.payload_argument_name)
if not isinstance(argument, str):
raise FunctionExecutionException(f"No payload is provided by the argument '{self.payload_argument_name}'.")
return argument, argument
def build_json_object(self, properties, arguments, property_namespace=None):
"""Build the JSON payload object."""
result = {}
for property_metadata in properties:
argument_name = self.get_argument_name_for_payload(property_metadata.name, property_namespace)
if property_metadata.type == "object":
node = self.build_json_object(property_metadata.properties, arguments, argument_name)
result[property_metadata.name] = node
continue
property_value = arguments.get(argument_name)
if property_value is not None:
result[property_metadata.name] = property_value
continue
if property_metadata.is_required:
raise FunctionExecutionException(
f"No argument is found for the '{property_metadata.name}' payload property."
)
return result
def build_operation_payload(
self, operation: RestApiOperation, arguments: KernelArguments
) -> tuple[str, str] | tuple[None, None]:
"""Build the operation payload."""
if operation.request_body is None and self.payload_argument_name not in arguments:
return None, None
if operation.request_body is not None:
return self.build_json_payload(operation.request_body, arguments)
return None, None
def get_argument_name_for_payload(self, property_name, property_namespace=None):
"""Get argument name for the payload."""
if not self.enable_payload_namespacing:
return property_name
return f"{property_namespace}.{property_name}" if property_namespace else property_name
def _get_first_response_media_type(
self, responses: OrderedDict[str, RestApiOperationExpectedResponse] | None
) -> str:
if responses:
first_response = next(iter(responses.values()))
return first_response.media_type if first_response.media_type else self.media_type_application_json
return self.media_type_application_json
async def run_operation(
self,
operation: RestApiOperation,
arguments: KernelArguments | None = None,
options: RestApiOperationRunOptions | None = None,
) -> str:
"""Runs the operation defined in the OpenAPI manifest."""
if not arguments:
arguments = KernelArguments()
url = self.build_operation_url(
operation=operation,
arguments=arguments,
server_url_override=options.server_url_override if options else None,
api_host_url=options.api_host_url if options else None,
)
headers = operation.build_headers(arguments=arguments)
payload, _ = self.build_operation_payload(operation=operation, arguments=arguments)
if self.auth_callback:
headers_update = self.auth_callback(**headers)
if isawaitable(headers_update):
headers_update = await headers_update
# at this point, headers_update is a valid dictionary
headers.update(headers_update) # type: ignore
if APP_INFO:
headers.update(APP_INFO)
headers = prepend_semantic_kernel_to_user_agent(headers)
if "Content-Type" not in headers:
responses = (
operation.responses
if isinstance(operation.responses, OrderedDict)
else OrderedDict(operation.responses or {})
)
headers["Content-Type"] = self._get_first_response_media_type(responses)
async def fetch():
async def make_request(client: httpx.AsyncClient):
merged_headers = client.headers.copy()
merged_headers.update(headers)
response = await client.request(
method=operation.method,
url=url,
headers=merged_headers,
json=json.loads(payload) if payload else None,
)
response.raise_for_status()
return response.text
if hasattr(self, "http_client") and self.http_client is not None:
return await make_request(self.http_client)
async with httpx.AsyncClient() as client:
return await make_request(client)
return await fetch()