Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement: add serialize method in monitor module and tweak executing agent in peers. #112

Merged
merged 5 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def execute(self, input_object: InputObject, agent_input: dict) -> dict:
framework = agent_input.get('framework', [])
futures = []
for task in framework:
agent_input_copy: dict = copy.deepcopy(agent_input)
# note: agent input shallow copy.
agent_input_copy: dict = dict(agent_input)
agent_input_copy['input'] = task
planner: Planner = PlannerManager().get_instance_obj(self.agent_model.plan.get('planner').get('name'))
futures.append(
Expand All @@ -99,7 +100,8 @@ def process_intput_object(self, input_object: InputObject, subtask: str, planner
# get agent toolsets.
action: dict = self.agent_model.action or dict()
tools: list = action.get('tool') or list()
input_object_copy: InputObject = copy.deepcopy(input_object)
# note: input object shallow copy.
input_object_copy: InputObject = InputObject(input_object.to_dict())
# wrap input_object for agent knowledge.
input_object_copy.add_data(planner_input_key, subtask)
# wrap input_object for agent toolsets.
Expand Down
13 changes: 7 additions & 6 deletions agentuniverse/agent/memory/chat_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,18 @@ def as_langchain(self) -> BaseChatMemory:
max_token_limit=self.max_tokens, messages=self.messages,
prompt_version=self.prompt_version)

def set_by_agent_model(self, **kwargs) -> None:
def set_by_agent_model(self, **kwargs):
""" Assign values of parameters to the ChatMemory model in the agent configuration."""
super().set_by_agent_model(**kwargs)
copied_obj = super().set_by_agent_model(**kwargs)
if 'messages' in kwargs and kwargs['messages']:
self.messages = kwargs['messages']
copied_obj.messages = kwargs['messages']
if 'llm' in kwargs and kwargs['llm']:
self.llm = kwargs['llm']
copied_obj.llm = kwargs['llm']
if 'input_key' in kwargs and kwargs['input_key']:
self.input_key = kwargs['input_key']
copied_obj.input_key = kwargs['input_key']
if 'output_key' in kwargs and kwargs['output_key']:
self.output_key = kwargs['output_key']
copied_obj.output_key = kwargs['output_key']
return copied_obj

def initialize_by_component_configer(self, component_configer: MemoryConfiger) -> 'ChatMemory':
"""Initialize the chat memory by the ComponentConfiger object.
Expand Down
9 changes: 6 additions & 3 deletions agentuniverse/agent/memory/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ def as_langchain(self) -> BaseMemory:
"""Convert the agentUniverse(aU) memory class to the langchain memory class."""
pass

def set_by_agent_model(self, **kwargs) -> None:
def set_by_agent_model(self, **kwargs):
""" Assign values of parameters to the Memory model in the agent configuration."""
# note: default shallow copy
copied_obj = self.model_copy()
if 'memory_key' in kwargs and kwargs['memory_key']:
self.memory_key = kwargs['memory_key']
copied_obj.memory_key = kwargs['memory_key']
if 'max_tokens' in kwargs and kwargs['max_tokens']:
self.max_tokens = kwargs['max_tokens']
copied_obj.max_tokens = kwargs['max_tokens']
return copied_obj

def get_instance_code(self) -> str:
"""Return the full name of the memory."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def agents_run(self, agents: dict, planner_config: dict, agent_input: dict, inpu
else:
LOGGER.info(f"Starting reviewing agent.")
reviewing_result = reviewingAgent.run(**input_object.to_dict())

input_object.add_data('reviewing_result', reviewing_result)

# add reviewing agent log info
Expand Down
10 changes: 4 additions & 6 deletions agentuniverse/agent/plan/planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ def handle_memory(self, agent_model: AgentModel, planner_input: dict) -> ChatMem
params['input_key'] = self.input_key
params['output_key'] = self.output_key

memory: ChatMemory = MemoryManager().get_instance_obj(component_instance_name=memory_name, new_instance=True)
memory: ChatMemory = MemoryManager().get_instance_obj(component_instance_name=memory_name)
if memory is None:
return None
memory.set_by_agent_model(**params)
return memory
return memory.set_by_agent_model(**params)

def run_all_actions(self, agent_model: AgentModel, planner_input: dict, input_object: InputObject):
"""Tool and knowledge processing.
Expand Down Expand Up @@ -142,9 +141,8 @@ def handle_llm(self, agent_model: AgentModel) -> LLM:
LLM: The language model.
"""
llm_name = agent_model.profile.get('llm_model').get('name')
llm: LLM = LLMManager().get_instance_obj(component_instance_name=llm_name, new_instance=True)
llm.set_by_agent_model(**agent_model.profile.get('llm_model'))
return llm
llm: LLM = LLMManager().get_instance_obj(component_instance_name=llm_name)
return llm.set_by_agent_model(**agent_model.profile.get('llm_model'))

def initialize_by_component_configer(self, component_configer: PlannerConfiger) -> 'Planner':
"""Initialize the planner by the PlannerConfiger object.
Expand Down
18 changes: 1 addition & 17 deletions agentuniverse/base/annotation/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,8 @@ def wrapper_sync(*args, **kwargs):

# invoke function
result = func(*args, **kwargs)
# serialize agent input and output dict
agent_input = serialize_agent_invocation(agent_input)
agent_output = serialize_agent_invocation(result)
# add agent invocation info to monitor
Monitor().trace_agent_invocation(source=source, agent_input=agent_input, agent_output=agent_output)
Monitor().trace_agent_invocation(source=source, agent_input=agent_input, agent_output=result)
return result

# sync function
Expand All @@ -145,16 +142,3 @@ def _get_agent_input(func, *args, **kwargs) -> dict:
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
return {k: v for k, v in bound_args.arguments.items()}


def default_serializer(obj):
if isinstance(obj, InputObject):
return obj.to_dict()
elif isinstance(obj, OutputObject):
return obj.to_dict()
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")


def serialize_agent_invocation(agent_invocation):
agent_invocation_serialized = json.loads(json.dumps(agent_invocation, default=default_serializer))
return agent_invocation_serialized
2 changes: 1 addition & 1 deletion agentuniverse/base/component/component_manager_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def unregister(self, component_instance_name: str):
self._instance_obj_map.pop(component_instance_name)

def get_instance_obj(self, component_instance_name: str,
appname: str = None, new_instance: bool = None) -> ComponentTypeVar:
appname: str = None, new_instance: bool = False) -> ComponentTypeVar:
"""Return the component instance object."""
appname = appname or ApplicationConfigManager().app_configer.base_info_appname
instance_code = f'{appname}.{self._component_type.value.lower()}.{component_instance_name}'
Expand Down
61 changes: 53 additions & 8 deletions agentuniverse/base/util/monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

from pydantic import BaseModel

from agentuniverse.agent.input_object import InputObject
from agentuniverse.agent.output_object import OutputObject
from agentuniverse.base.annotation.singleton import singleton
from agentuniverse.base.config.configer import Configer

Expand Down Expand Up @@ -57,12 +59,6 @@ def trace_llm_invocation(self, source: str, llm_input: Union[str, dict], llm_out
with jsonlines.open(path_save, 'a') as writer:
writer.write(llm_invocation)

def _get_or_create_subdir(self, subdir: str) -> str:
"""Get or create a subdirectory if it doesn't exist in the monitor directory."""
path = os.path.join(self.dir, subdir)
os.makedirs(path, exist_ok=True)
return path

def trace_agent_invocation(self, source: str, agent_input: Union[str, dict],
agent_output: Union[str, dict]) -> None:
"""Trace the agent invocation and save it to the monitor jsonl file."""
Expand All @@ -78,8 +74,8 @@ def trace_agent_invocation(self, source: str, agent_input: Union[str, dict],
agent_invocation = {
"source": source,
"date": date.strftime("%Y-%m-%d %H:%M:%S"),
"agent_input": agent_input,
"agent_output": agent_output,
"agent_input": self.serialize_obj(agent_input),
"agent_output": self.serialize_obj(agent_output),
}
# files are stored in hours
filename = f"agent_{source}_{date.strftime('%Y-%m-%d-%H')}.jsonl"
Expand All @@ -90,3 +86,52 @@ def trace_agent_invocation(self, source: str, agent_input: Union[str, dict],
with jsonlines.open(path_save, 'a') as writer:
writer.write(agent_invocation)

def _get_or_create_subdir(self, subdir: str) -> str:
"""Get or create a subdirectory if it doesn't exist in the monitor directory."""
path = os.path.join(self.dir, subdir)
os.makedirs(path, exist_ok=True)
return path

@staticmethod
def default_serializer(obj):
"""Default serializer for objects."""
if isinstance(obj, InputObject):
return obj.to_dict()
elif isinstance(obj, OutputObject):
return obj.to_dict()
elif isinstance(obj, BaseModel):
try:
return obj.dict()
except TypeError:
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
else:
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")

def serialize_obj(self, obj):
"""Serialize an object and filter out non-serializable values."""
filtered_obj = self.filter_and_serialize(obj)
return json.loads(json.dumps(filtered_obj, default=self.default_serializer))

def filter_and_serialize(self, obj):
"""Recursively filter out non-serializable values from an object."""

def is_json_serializable(value):
"""Check if value is a JSON serializable object."""
try:
json.dumps(value, default=self.default_serializer)
return True
except (TypeError, OverflowError):
return False

def filter_dict(d):
return {k: v for k, v in d.items() if is_json_serializable(v)}

def recursive_filter(o):
if isinstance(o, dict):
return filter_dict({k: recursive_filter(v) for k, v in o.items()})
elif isinstance(o, list):
return [recursive_filter(i) for i in o if is_json_serializable(i)]
else:
return o

return recursive_filter(obj)
19 changes: 11 additions & 8 deletions agentuniverse/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,25 @@ def initialize_by_component_configer(self, component_configer: LLMConfiger) -> '
self._max_context_length = component_configer.configer.value['max_context_length']
return self

def set_by_agent_model(self, **kwargs) -> None:
def set_by_agent_model(self, **kwargs):
""" Assign values of parameters to the LLM model in the agent configuration."""
# note: default shallow copy
copied_obj = self.model_copy()
if 'model_name' in kwargs and kwargs['model_name']:
self.model_name = kwargs['model_name']
copied_obj.model_name = kwargs['model_name']
if 'temperature' in kwargs and kwargs['temperature']:
self.temperature = kwargs['temperature']
copied_obj.temperature = kwargs['temperature']
if 'request_timeout' in kwargs and kwargs['request_timeout']:
self.request_timeout = kwargs['request_timeout']
copied_obj.request_timeout = kwargs['request_timeout']
if 'max_tokens' in kwargs and kwargs['max_tokens']:
self.max_tokens = kwargs['max_tokens']
copied_obj.max_tokens = kwargs['max_tokens']
if 'max_retries' in kwargs and kwargs['max_retries']:
self.max_retries = kwargs['max_retries']
copied_obj.max_retries = kwargs['max_retries']
if 'streaming' in kwargs and kwargs['streaming']:
self.streaming = kwargs['streaming']
copied_obj.streaming = kwargs['streaming']
if 'max_context_length' in kwargs and kwargs['max_context_length']:
self._max_context_length = kwargs['max_context_length']
copied_obj._max_context_length = kwargs['max_context_length']
return copied_obj

def max_context_length(self) -> int:
"""Max context length.
Expand Down
19 changes: 9 additions & 10 deletions agentuniverse/llm/openai_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOu
**kwargs: Arbitrary keyword arguments.
"""
streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
if self.client is None:
self.client = self._new_client()
self.client = self._new_client()
chat_completion = self.client.chat.completions.create(
messages=messages,
model=kwargs.pop('model', self.model_name),
Expand All @@ -116,8 +115,7 @@ async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIt
**kwargs: Arbitrary keyword arguments.
"""
streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
if self.async_client is None:
self.async_client = self._new_async_client()
self.async_client = self._new_async_client()
chat_completion = await self.async_client.chat.completions.create(
messages=messages,
model=kwargs.pop('model', self.model_name),
Expand All @@ -135,17 +133,18 @@ def as_langchain(self) -> BaseLanguageModel:
"""Convert the agentUniverse(aU) openai llm class to the langchain openai llm class."""
return LangchainOpenAI(self)

def set_by_agent_model(self, **kwargs) -> None:
def set_by_agent_model(self, **kwargs):
""" Assign values of parameters to the OpenAILLM model in the agent configuration."""
super().set_by_agent_model(**kwargs)
copied_obj = super().set_by_agent_model(**kwargs)
if 'openai_api_key' in kwargs and kwargs['openai_api_key']:
self.openai_api_key = kwargs['openai_api_key']
copied_obj.openai_api_key = kwargs['openai_api_key']
if 'openai_api_base' in kwargs and kwargs['openai_api_base']:
self.openai_api_base = kwargs['openai_api_base']
copied_obj.openai_api_base = kwargs['openai_api_base']
if 'openai_proxy' in kwargs and kwargs['openai_proxy']:
self.openai_proxy = kwargs['openai_proxy']
copied_obj.openai_proxy = kwargs['openai_proxy']
if 'openai_client_args' in kwargs and kwargs['openai_client_args']:
self.openai_client_args = kwargs['openai_client_args']
copied_obj.openai_client_args = kwargs['openai_client_args']
return copied_obj

def max_context_length(self) -> int:
"""Max context length.
Expand Down
12 changes: 7 additions & 5 deletions agentuniverse/llm/openai_style_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,17 @@ def as_langchain(self) -> BaseLanguageModel:

def set_by_agent_model(self, **kwargs) -> None:
""" Assign values of parameters to the OpenAILLM model in the agent configuration."""
super().set_by_agent_model(**kwargs)
copied_obj = super().set_by_agent_model(**kwargs)
if 'api_key' in kwargs and kwargs['api_key']:
self.api_key = kwargs['api_key']
copied_obj.api_key = kwargs['api_key']
if 'api_base' in kwargs and kwargs['api_base']:
self.api_base = kwargs['api_base']
copied_obj.api_base = kwargs['api_base']
if 'proxy' in kwargs and kwargs['proxy']:
self.proxy = kwargs['proxy']
copied_obj.proxy = kwargs['proxy']
if 'client_args' in kwargs and kwargs['client_args']:
self.client_args = kwargs['client_args']
copied_obj.client_args = kwargs['client_args']
return copied_obj


@staticmethod
def parse_result(chunk):
Expand Down
2 changes: 1 addition & 1 deletion docs/guidebook/en/2_2_5_Memory_Define_And_Use.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ You can get the Memory instance corresponding to its name using the `.get_instan
```python
from agentuniverse.agent.memory.memory_manager import MemoryManager

memory = MemoryManager().get_instance_obj(component_instance_name=memory_name, new_instance=True)
memory = MemoryManager().get_instance_obj(component_instance_name=memory_name)
```

# Conclusion
Expand Down
2 changes: 1 addition & 1 deletion docs/guidebook/zh/2_2_5_记忆定义与使用.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ chat_history=[{"content": "你好", "type": "human"}, {"content": "你好", "typ
```python
from agentuniverse.agent.memory.memory_manager import MemoryManager

memory = MemoryManager().get_instance_obj(component_instance_name=memory_name, new_instance=True)
memory = MemoryManager().get_instance_obj(component_instance_name=memory_name)
```

# 总结
Expand Down