-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve UX for action processor #709
Changes from 8 commits
017a1cf
b6252ea
4f9f492
9f39a61
d1b40fc
f8f426e
36e2317
bf13af4
2fc40f3
431ce02
8d2ce33
fe574c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,6 +72,9 @@ | |
MetadataType = t.Dict[_KeyType, t.Dict] | ||
ParamType = t.TypeVar("ParamType") | ||
|
||
# Enable deprecation warnings | ||
warnings.simplefilter("always", DeprecationWarning) | ||
|
||
|
||
class ProcessorsType(te.TypedDict): | ||
"""Request and response processors.""" | ||
|
@@ -251,18 +254,24 @@ def _limit_file_search_response(response: t.Dict) -> t.Dict: | |
self._api_key = None | ||
self.logger.debug("`api_key` is not set when initializing toolset.") | ||
|
||
self._processors = ( | ||
processors | ||
if processors is not None | ||
else {"post": {}, "pre": {}, "schema": {}} | ||
) | ||
if processors is not None: | ||
warnings.warn( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The warning message here could be more specific. Instead of just mentioning 'processors', it could say "Setting 'processors' directly on the ToolSet instance is deprecated..." |
||
"Setting 'processors' on the ToolSet is deprecated, they should" | ||
"be provided to the 'get_tools()' method instead.", | ||
DeprecationWarning, | ||
stacklevel=2, | ||
) | ||
self._processors: ProcessorsType = processors | ||
else: | ||
self._processors = {"post": {}, "pre": {}, "schema": {}} | ||
|
||
self._metadata = metadata or {} | ||
self._workspace_id = workspace_id | ||
self._workspace_config = workspace_config | ||
self._local_client = LocalClient() | ||
|
||
if len(kwargs) > 0: | ||
self.logger.info(f"Extra kwards while initializing toolset: {kwargs}") | ||
self.logger.info(f"Extra kwargs while initializing toolset: {kwargs}") | ||
|
||
self.logger.debug("Loading local tools") | ||
load_local_tools() | ||
|
@@ -542,7 +551,10 @@ def _serialize_execute_params(self, param: ParamType) -> ParamType: | |
return [self._serialize_execute_params(p) for p in param] # type: ignore | ||
|
||
if isinstance(param, dict): | ||
return {key: self._serialize_execute_params(val) for key, val in param.items()} # type: ignore | ||
return { | ||
key: self._serialize_execute_params(val) # type: ignore | ||
for key, val in param.items() | ||
} | ||
|
||
raise ValueError( | ||
"Invalid value found for execute parameters" | ||
|
@@ -593,6 +605,12 @@ def _process( | |
f" through: {processor.__name__}" | ||
) | ||
data = processor(data) | ||
# Users may not respect our type annotations and return something that isn't a dict. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding more comprehensive error handling for cases where processors might fail or return unexpected types. While you've added a warning for unexpected return types, it might be beneficial to have a more robust error handling strategy, possibly including custom exceptions or fallback behavior. |
||
# If that happens we should show a friendly error message. | ||
if not isinstance(data, t.Dict): | ||
raise TypeError( | ||
f"Expected {type_}-processor to return 'dict', got {type(data).__name__!r}" | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest log a warning instead raising error since this can interrupt the agent execution and agent might get stuck in a loop, also as long as the return object is JSON serialisable there won't be any issue so just a warning should be fine. We can improve the documentation around this. |
||
return data | ||
|
||
def _process_request(self, action: Action, request: t.Dict) -> t.Dict: | ||
|
@@ -628,6 +646,22 @@ def _process_schema_properties(self, action: Action, properties: t.Dict) -> t.Di | |
type_="schema", | ||
) | ||
|
||
def _merge_processors(self, processors: ProcessorsType) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding a type hint for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding a type hint for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding a docstring for the new |
||
for processor_type in self._processors.keys(): | ||
if processor_type in processors: | ||
processor_type = t.cast( | ||
te.Literal["pre", "post", "schema"], processor_type | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can extract |
||
) | ||
new_processors = processors[processor_type] | ||
|
||
if processor_type in self._processors: | ||
existing_processors = self._processors[processor_type] | ||
else: | ||
existing_processors = {} | ||
self._processors[processor_type] = existing_processors | ||
|
||
existing_processors.update(new_processors) | ||
|
||
@_record_action_if_available | ||
def execute_action( | ||
self, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding type hints for the |
||
|
@@ -637,6 +671,8 @@ def execute_action( | |
entity_id: t.Optional[str] = None, | ||
connected_account_id: t.Optional[str] = None, | ||
text: t.Optional[str] = None, | ||
*, | ||
processors: t.Optional[ProcessorsType] = None, | ||
) -> t.Dict: | ||
""" | ||
Execute an action on a given entity. | ||
|
@@ -651,6 +687,9 @@ def execute_action( | |
""" | ||
action = Action(action) | ||
params = self._serialize_execute_params(param=params) | ||
if processors is not None: | ||
self._merge_processors(processors) | ||
|
||
if not action.is_runtime: | ||
params = self._process_request(action=action, request=params) | ||
metadata = self._add_metadata(action=action, metadata=metadata) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a type hint for the
warnings
module to improve code readability and maintainability.