diff --git a/README.md b/README.md index 905c6673..82fecefc 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # AIOS: LLM Agent Operating System - - + + [![Code License](https://img.shields.io/badge/Code%20License-MIT-green.svg)](https://github.com/agiresearch/AIOS/blob/main/LICENSE) AIOS, a Large Language Model (LLM) Agent operating system, embeds large language model into Operating Systems (OS) as the brain of the OS, enabling an operating system "with soul" -- an important step towards AGI. AIOS is designed to optimize resource allocation, facilitate context switch across agents, enable concurrent execution of agents, provide tool service for agents, maintain access control for agents, and provide a rich set of toolkits for LLM Agent developers. @@ -23,39 +23,51 @@ AIOS, a Large Language Model (LLM) Agent operating system, embeds large language ```bash git clone https://github.com/agiresearch/AIOS.git ``` -**Make sure you have Python >= 3.9 and <= 3.11** -Install the required packages using pip +**Make sure you have Python >= 3.9 and <= 3.11** +Install the required packages using pip ```bash pip install -r requirements.txt ``` ### Usage - -### Run with locally-deployed LLM -Set up [Hugging Face token](https://huggingface.co/settings/tokens) and cache directory +If you use open-sourced models from huggingface, you need to setup your [Hugging Face token](https://huggingface.co/settings/tokens) and cache directory ```bash export HUGGING_FACE_HUB_TOKEN= export HF_HOME= ``` -Replace the max_gpu_memory and eval_device with your own and run +If you use LLM APIs like Gemini-pro, you need to setup your [Gemini API Key](https://aistudio.google.com/app/apikey) +```bash +export GEMINI_API_KEY= +``` +Here we provide two modes to run the AIOS: interactive mode and deployment mode +#### Interactive Mode +In the interactive mode, you can interact with AIOS to see the output of each step in running multiple agents ```python -# Use Gemma-2b-it +# Use Gemma-2b-it, replace the max_gpu_memory and eval_device with your own and run python main.py --llm_name gemma-2b-it --max_gpu_memory '{"0": "24GB"}' --eval_device "cuda:0" --max_new_tokens 256 ``` - ```python -# Use Mixtral-8x7b-it +# Use Mixtral-8x7b-it, replace the max_gpu_memory and eval_device with your own and run python main.py --llm_name mixtral-8x7b-it --max_gpu_memory '{"0": "48GB", "1": "48GB", "2": "48GB"}' --eval_device "cuda:0" --max_new_tokens 256 ``` -### Run with LLM API -Run with Gemini-pro, setup [Gemini API Key](https://aistudio.google.com/app/apikey) -```bash -export GEMINI_API_KEY= +```python +# Use Gemini-pro, run with Gemini-pro +python main.py --llm_name gemini-pro +``` +#### Deployment Mode +In the deployment mode, the outputs of running agents are stored in files. And in this mode, you are provided with multiple commands to run agents and see resource usage of agents (e.g., run \: \, print agent) +```python +# Use Gemma-2b-it, replace the max_gpu_memory and eval_device with your own and run +python simulator.py --llm_name gemma-2b-it --max_gpu_memory '{"0": "24GB"}' --eval_device "cuda:0" --max_new_tokens 256 --scheduler_log_mode file --agent_log_mode file +``` +```python +# Use Mixtral-8x7b-it +python simulator.py --llm_name mixtral-8x7b-it --max_gpu_memory '{"0": "48GB", "1": "48GB", "2": "48GB"}' --eval_device "cuda:0" --max_new_tokens 256 --scheduler_log_mode file --agent_log_mode file ``` ```python # Use Gemini-pro -python main.py --llm_name gemini-pro +python simulator.py --llm_name gemini-pro --scheduler_log_mode file --agent_log_mode file ``` ## 🖋️ References @@ -80,7 +92,7 @@ AIOS is dedicated to facilitating LLM agents' development and deployment in a sy For detailed information on how to contribute, see [CONTRIBUTE](https://github.com/agiresearch/AIOS/blob/main/CONTRIBUTE.md). If you would like to contribute to the codebase, [issues](https://github.com/agiresearch/AIOS/issues) or [pull requests](https://github.com/agiresearch/AIOS/pulls) are always welcome! ## 🌟 Discord Channel -If you would like to join the community, ask questions, chat with fellows, learn about or propose new features, and participate in future developments, join our [Discord Community](https://discord.gg/aUg3b2Kd)! +If you would like to join the community, ask questions, chat with fellows, learn about or propose new features, and participate in future developments, join our [Discord Community](https://discord.gg/aUg3b2Kd)! ## 🌍 AIOS Contributors diff --git a/main.py b/main.py index db8ce5f7..21e75615 100644 --- a/main.py +++ b/main.py @@ -42,40 +42,43 @@ def main(): llm_name = args.llm_name max_gpu_memory = args.max_gpu_memory max_new_tokens = args.max_new_tokens + scheduler_log_mode = args.scheduler_log_mode + agent_log_mode = args.agent_log_mode llm = llms.LLMKernel(llm_name, max_gpu_memory, max_new_tokens) # start the scheduler - scheduler = FIFOScheduler(llm) + scheduler = FIFOScheduler( + llm = llm, + log_mode = scheduler_log_mode + ) scheduler.start() agent_factory = AgentFactory( llm = llm, - agent_process_queue = scheduler.agent_process_queue + agent_process_queue = scheduler.agent_process_queue, + agent_log_mode = agent_log_mode ) - # assign maximum number of agents that can run in parallel - agent_thread_pool = ThreadPoolExecutor(max_workers=64) - # construct agents math_agent = agent_factory.activate_agent( - agent_name = "MathAgent", + agent_name = "MathAgent", task_input = "Solve the problem that Albert is wondering how much pizza he can eat in one day. He buys 2 large pizzas and 2 small pizzas. A large pizza has 16 slices and a small pizza has 8 slices. If he eats it all, how many pieces does he eat that day?", ) narrative_agent = agent_factory.activate_agent( - agent_name = "NarrativeAgent", + agent_name = "NarrativeAgent", task_input = "Craft a tale about a valiant warrior on a quest to uncover priceless treasures hidden within a mystical island.", ) - + rec_agent = agent_factory.activate_agent( - agent_name = "RecAgent", + agent_name = "RecAgent", task_input = "I want to take a tour to New York during the spring break, recommend some restaurants around for me.", ) agents = [math_agent, narrative_agent, rec_agent] # run agents concurrently - tasks = [agent_thread_pool.submit(agent.run) for agent in agents] + tasks = [agent_factory.agent_thread_pool.submit(agent.run) for agent in agents] for r in as_completed(tasks): res = r.result() diff --git a/simulator.py b/simulator.py new file mode 100644 index 00000000..6443478c --- /dev/null +++ b/simulator.py @@ -0,0 +1,95 @@ +import os +import sys +import json + +from src.command_parser import ( + PunctuationParser, + ChatGPTParser +) + +from src.command_executor import ( + Executor +) + +from src.scheduler.fifo_scheduler import FIFOScheduler + +from src.utils.utils import ( + parse_global_args, + logger +) + +from src.agents.agent_factory import AgentFactory + +import warnings + +from src.llms import llms + +from src.agents.math_agent.math_agent import MathAgent + +from src.agents.narrative_agent.narrative_agent import NarrativeAgent + +from src.agents.rec_agent.rec_agent import RecAgent + +from src.agents.travel_agent.travel_agent import TravelAgent + +from concurrent.futures import ThreadPoolExecutor, as_completed + +from src.utils.utils import logger + +def main(): + warnings.filterwarnings("ignore") + parser = parse_global_args() + args = parser.parse_args() + + llm_name = args.llm_name + max_gpu_memory = args.max_gpu_memory + max_new_tokens = args.max_new_tokens + scheduler_log_mode = args.scheduler_log_mode + agent_log_mode = args.agent_log_mode + + llm = llms.LLMKernel(llm_name, max_gpu_memory, max_new_tokens) + + # start the scheduler + scheduler = FIFOScheduler( + llm = llm, + log_mode = scheduler_log_mode + ) + + agent_factory = AgentFactory( + llm = llm, + agent_process_queue = scheduler.agent_process_queue, + agent_log_mode = agent_log_mode + ) + + parser = PunctuationParser( + llm = llm + ) + + executor = Executor(agent_factory=agent_factory) + + scheduler.start() + + # agent_factory.start() # TODO add gabage recycle of agent ID + + while True: + try: + # Read a command line input + command_line = input(f"[{llm_name}]>") + if command_line.strip().lower() == "exit": + print("Exiting...") + break + + # Parse command + tokens = parser.parse(command_line) + # Execute the command + executor.execute(tokens) + + except KeyboardInterrupt: + # Handle Ctrl+C gracefully + print("\nUse 'exit' to quit the shell.") + except EOFError: + pass + scheduler.stop() + +if __name__ == "__main__": + main() diff --git a/src/agents/agent_factory.py b/src/agents/agent_factory.py index 0460bb0d..57d516c0 100644 --- a/src/agents/agent_factory.py +++ b/src/agents/agent_factory.py @@ -4,44 +4,92 @@ from src.utils.global_param import ( MAX_AID, - agent_pool, agent_table ) + +from concurrent.futures import ThreadPoolExecutor, as_completed + +from threading import Thread class AgentFactory: - def __init__(self, llm, agent_process_queue): + def __init__(self, llm, agent_process_queue, agent_log_mode): self.MAX_AID = MAX_AID self.llm = llm self.aid_pool = [i for i in range(self.MAX_AID)] heapq.heapify(self.aid_pool) self.agent_process_queue = agent_process_queue - + self.agent_table = agent_table - self.agent_pool = agent_pool - + self.current_agents = {} + + self.agent_thread_pool = ThreadPoolExecutor(max_workers=64) + + self.thread = Thread(target=self.deactivate_agent) + + self.agent_log_mode = agent_log_mode + def activate_agent(self, agent_name, task_input): agent = self.agent_table[agent_name]( - agent_name = agent_name, + agent_name = agent_name, task_input = task_input, llm = self.llm, - agent_process_queue = self.agent_process_queue + agent_process_queue = self.agent_process_queue, + log_mode = self.agent_log_mode ) aid = heapq.heappop(self.aid_pool) - + agent.set_aid(aid) time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - agent.set_status("Active") + + agent.set_status("active") agent.set_created_time(time) - - self.agent_pool[aid] = agent + + self.current_agents[aid] = agent return agent - - def deactivate_agent(self, aid): - self.agent_pool.popitem(aid) - heapq.heappush(self.aid_pool, aid) - # self.aid_pool.heappush(aid) - - def print(self): - for aid, agent in agent_pool: - print(f"| Agent ID: {aid} | Agent Name: {agent.agent_name} | Status: {agent.get_status()} | Activated Time: {agent.get_created_time()} |") - + + def print_agent(self): + headers = ["Agent ID", "Agent Name", "Created Time", "Status"] + data = [] + for id, agent in self.current_agents.items(): + data.append( + [id, agent.agent_name, agent.created_time, agent.status] + ) + self.print(headers=headers, data=data) + + + def print(self, headers, data): + # align output + column_widths = [ + max(len(str(row[i])) for row in [headers] + data) for i in range(len(headers)) + ] + print("-" * (sum(column_widths) + len(headers) * 3 - 1)) + print(self.format_row(headers, column_widths)) + print("-" * (sum(column_widths) + len(headers) * 3 - 1)) + for row in data: + print(self.format_row(row, column_widths)) + print("-" * (sum(column_widths) + len(headers) * 3 - 1)) + + + def format_row(self, row, widths, align="<"): + row_str = " | ".join(f"{str(item):{align}{widths[i]}}" for i, item in enumerate(row)) + return row_str + + def deactivate_agent(self): + import time + while True: + invalid_aids = [] + items = self.current_agents.items() + for aid, agent in items: + if agent.get_status() == "Done": + agent.set_status("Inactive") + time.sleep(5) + invalid_aids.append(aid) + for aid in invalid_aids: + self.current_agents.pop(aid) + heapq.heappush(self.aid_pool, aid) + + def start(self): + """start the factory to check inactive agent""" + self.thread.start() + + def stop(self): + self.thread.join() diff --git a/src/agents/base.py b/src/agents/base.py index 6553214e..f9777585 100644 --- a/src/agents/base.py +++ b/src/agents/base.py @@ -7,9 +7,7 @@ # AgentProcessQueue ) -from src.utils.utils import ( - logger -) +import logging import time @@ -27,7 +25,13 @@ def join(self): return self._return class BaseAgent: - def __init__(self, agent_name, task_input, llm, agent_process_queue): + def __init__(self, + agent_name, + task_input, + llm, + agent_process_queue, + log_mode: str + ): self.agent_name = agent_name self.config = self.load_config() self.prefix = " ".join(self.config["description"]) @@ -35,19 +39,41 @@ def __init__(self, agent_name, task_input, llm, agent_process_queue): self.llm = llm self.agent_process_queue = agent_process_queue - logger.info(agent_name + " has been initialized.") - # print(f"Initialized time: {self.initialized_time}") - - # self.memory_pool = SingleMemory() - + self.log_mode = log_mode + self.logger = self.setup_logger() + self.logger.info(f"[{agent_name}]" + " has been initialized.") + self.set_status("Active") self.set_created_time(time) - - + def run(self): '''Execute each step to finish the task.''' pass + def setup_logger(self): + logger = logging.getLogger(f"{self.agent_name} Logger") + # logger.setLevel(logging.INFO) # Set the minimum logging level + logger.disabled = True + date_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # Provide two log modes: console and file + + if self.log_mode == "console": + logger.disabled = False + handler = logging.StreamHandler() + handler.setLevel(logging.INFO) # Set logging level for console output + else: + assert self.log_mode == "file" + log_dir = os.path.join(os.getcwd(), "logs", "agents", + f"{self.agent_name}") + if not os.path.exists(log_dir): + os.makedirs(log_dir) + log_file = os.path.join(log_dir, f"{date_time}.txt") + handler = logging.FileHandler(log_file) + handler.setLevel(logging.INFO) # Set logging + + logger.addHandler(handler) # enabled when run in a simulated shell + return logger + def load_config(self): config_file = os.path.join(os.getcwd(), "src", "agents", "agent_config/{}.json".format(self.agent_name)) with open(config_file, "r") as f: @@ -78,9 +104,9 @@ def listen(self, agent_process): """ while agent_process.get_response() is None: time.sleep(0.2) - + return agent_process.get_response() - + def check_tool_use(self, prompt, tool_info, temperature=0.): prompt = f'You are allowed to use the following tools: \n\n```{tool_info}```\n\n' \ @@ -89,12 +115,12 @@ def check_tool_use(self, prompt, tool_info, temperature=0.): response, waiting_time, turnaround_time = self.get_response(prompt, temperature) # print(f'Tool use check: {response}') if 'yes' in response.lower(): - return True + return True, waiting_time, turnaround_time if 'no' in response.lower(): - return False + return False, waiting_time, turnaround_time # print(f'Temperature: {temperature}') - - logger.error('No valid format output when calling "Tool use check".') + + self.logger.error('No valid format output when calling "Tool use check".') return None, waiting_time, turnaround_time # exit(1) @@ -124,7 +150,7 @@ def check_tool_name(self, prompt, tool_list, temperature=0.): return tool_list[response - 1], waiting_time, turnaround_time else: return None, waiting_time, turnaround_time - + def check_branch(self, prompt, flow_ptr, temperature=0.): possible_keys = list(flow_ptr.branch.keys()) prompt = f'Choose the closest representation of ```{prompt}``` from the following options:\n' @@ -137,7 +163,7 @@ def check_branch(self, prompt, flow_ptr, temperature=0.): return possible_keys[response - 1], waiting_time, turnaround_time else: return None, waiting_time, turnaround_time - + def get_final_result(self, prompt): prompt = f"Given the interaction history: {prompt}, give the answer to the task input and don't be verbose!" @@ -172,4 +198,3 @@ def get_created_time(self): def parse_result(self, prompt): pass - \ No newline at end of file diff --git a/src/agents/math_agent/math_agent.py b/src/agents/math_agent/math_agent.py index de3430f7..fdcfff1f 100644 --- a/src/agents/math_agent/math_agent.py +++ b/src/agents/math_agent/math_agent.py @@ -10,10 +10,6 @@ AgentProcess ) -from src.utils.utils import ( - logger -) - import argparse from concurrent.futures import as_completed @@ -22,24 +18,30 @@ from src.agents.flow import Flow -# from src.tools.online.currency_converter import CurrencyConverterAPI +from src.tools.online.currency_converter import CurrencyConverterAPI from src.tools.online.wolfram_alpha import WolframAlpha - class MathAgent(BaseAgent): - def __init__(self, agent_name, task_input, llm, agent_process_queue): - BaseAgent.__init__(self, agent_name, task_input, llm, agent_process_queue) + def __init__(self, + agent_name, + task_input, + llm, agent_process_queue, + log_mode: str + ): + BaseAgent.__init__(self, agent_name, task_input, llm, agent_process_queue, log_mode) self.tool_list = { # "currency_converter": CurrencyConverterAPI, - "wolfram_alpha": WolframAlpha + "wolfram_alpha": WolframAlpha, + "currency_converter": CurrencyConverterAPI } self.tool_check_max_fail_times = 10 self.tool_calling_max_fail_times = 10 self.tool_info = "".join(self.config["tool_info"]) - + self.load_flow() + def load_flow(self): self.flow_ptr = Flow(self.config["flow"]) - + def run(self): prompt = "" prefix = self.prefix @@ -48,9 +50,9 @@ def run(self): waiting_times = [] turnaround_times = [] task_input = "The task you need to solve is: " + task_input - logger.info(f"[{self.agent_name}] {task_input}\n") + self.logger.info(f"[{self.agent_name}] {task_input}\n") prompt += task_input - + # predefined steps steps = [ "identify and outline the sub-problems that need to be solved as stepping stones toward the solution. ", @@ -59,13 +61,13 @@ def run(self): ] for i, step in enumerate(steps): prompt += f"\nIn step {i+1}, you need to {step}. Output should focus on current step and don't be verbose!" - logger.info(f"[{self.agent_name}] Step {i+1}: {step}\n") + self.logger.info(f"[{self.agent_name}] Step {i+1}: {step}\n") response, waiting_time, turnaround_time = self.get_response(prompt) waiting_times.append(waiting_time) turnaround_times.append(turnaround_time) prompt += f"The solution to step {i+1} is: {response}\n" - logger.info(f"[{self.agent_name}] The solution to step {i+1}: {response}\n") - + self.logger.info(f"[{self.agent_name}] The solution to step {i+1}: {response}\n") + # TODO test workflow of MathAgent # round_id = 1 # flow_ptr = self.flow_ptr.header @@ -73,13 +75,18 @@ def run(self): # current_progress = [] # questions, answers, output_record = [], [], [] # record each round: question, LLM output, tool output (if exists else LLM output) # while True: - # prompt = self.get_prompt(self.tool_info, flow_ptr, task_input, current_progress) - # res, waiting_time,turnaround_time = self.get_response(prompt) + # query = self.get_prompt(self.tool_info, flow_ptr, task_input, current_progress) + # prompt += query + # res, waiting_time,turnaround_time = self.get_response(query) + + # prompt += res # waiting_times.append(waiting_time) # turnaround_times.append(turnaround_time) # questions.append(str(flow_ptr)) - # answers.append(str(res)) + # answers.append(res) + + # prompt += res # current_progress.append(f'Question {round_id}: ```{flow_ptr.get_instruction()}```') # current_progress.append(f'Answer {round_id}: ```{res}```') @@ -136,9 +143,9 @@ def run(self): turnaround_times.append(turnaround_time) self.set_status("Done") - logger.info(f"{self.agent_name} has finished: average waiting time: {np.mean(np.array(waiting_times))} seconds, turnaround time: {np.mean(np.array(turnaround_times))} seconds\n") + self.logger.info(f"[{self.agent_name}] has finished: average waiting time: {np.mean(np.array(waiting_times))} seconds, turnaround time: {np.mean(np.array(turnaround_times))} seconds\n") - logger.info(f"[{self.agent_name}] {task_input} Final result is: {final_result}") + self.logger.info(f"[{self.agent_name}] {task_input} Final result is: {final_result}") return final_result @@ -153,4 +160,4 @@ def run(self): agent.run() # thread_pool.submit(agent.run) - # agent.run() \ No newline at end of file + # agent.run() diff --git a/src/agents/narrative_agent/narrative_agent.py b/src/agents/narrative_agent/narrative_agent.py index 7cc72860..536be9e5 100644 --- a/src/agents/narrative_agent/narrative_agent.py +++ b/src/agents/narrative_agent/narrative_agent.py @@ -6,10 +6,6 @@ AgentProcess ) -from src.utils.utils import ( - logger -) - import numpy as np import argparse @@ -17,8 +13,15 @@ from concurrent.futures import as_completed class NarrativeAgent(BaseAgent): - def __init__(self, agent_name, task_input, llm, agent_process_queue): - BaseAgent.__init__(self, agent_name, task_input, llm, agent_process_queue) + def __init__(self, + agent_name, + task_input, + llm, + agent_process_queue, + log_mode: str + ): + BaseAgent.__init__(self, agent_name, task_input, llm, agent_process_queue, log_mode) + print(self.log_mode) def run(self): waiting_times = [] @@ -28,9 +31,9 @@ def run(self): prompt += prefix task_input = self.task_input task_input = "The task you need to solve is: " + task_input - logger.info(f"[{self.agent_name}] {task_input}\n") + self.logger.info(f"[{self.agent_name}] {task_input}\n") prompt += task_input - + steps = [ "develop the story's setting and characters, establish a background and introduce the main characters.", "given the background and characters, create situations that lead to the rising action, develop the climax with a significant turning point, and then move towards the resolution.", @@ -40,7 +43,7 @@ def run(self): for i, step in enumerate(steps): prompt += f"\nIn step {i+1}, you need to {step}. Output should focus on current step and don't be verbose!" - logger.info(f"[{self.agent_name}] Step {i+1}: {step}\n") + self.logger.info(f"[{self.agent_name}] Step {i+1}: {step}\n") response, waiting_time, turnaround_time = self.get_response(prompt) waiting_times.append(waiting_time) @@ -48,7 +51,7 @@ def run(self): prompt += f"The solution to step {i+1} is: {response}\n" - logger.info(f"[{self.agent_name}] The solution to step {i+1}: {response}\n") + self.logger.info(f"[{self.agent_name}] The solution to step {i+1}: {response}\n") prompt += response @@ -59,11 +62,11 @@ def run(self): turnaround_times.append(turnaround_time) # return res # print(f"Average waiting time: {np.mean(np.array(waiting_times))}") - logger.info(f"{self.agent_name} has finished: average waiting time: {np.mean(np.array(waiting_times))} seconds, turnaround time: {np.mean(np.array(turnaround_times))} seconds\n") + self.logger.info(f"[{self.agent_name}] has finished: average waiting time: {np.mean(np.array(waiting_times))} seconds, turnaround time: {np.mean(np.array(turnaround_times))} seconds\n") # time.sleep(10) self.set_status("Done") - logger.info(f"[{self.agent_name}] {task_input} Final result is: {final_result}") + self.logger.info(f"[{self.agent_name}] {task_input} Final result is: {final_result}") return final_result @@ -76,5 +79,3 @@ def run(self): agent = NarrativeAgent(args.agent_name, args.task_input) agent.run() - # agent_thread_pool.submit(agent.run) - # agent.run() \ No newline at end of file diff --git a/src/agents/rec_agent/rec_agent.py b/src/agents/rec_agent/rec_agent.py index d10da1e2..472b0049 100644 --- a/src/agents/rec_agent/rec_agent.py +++ b/src/agents/rec_agent/rec_agent.py @@ -6,18 +6,20 @@ AgentProcess ) -from src.utils.utils import ( - logger -) - import argparse from concurrent.futures import as_completed import numpy as np class RecAgent(BaseAgent): - def __init__(self, agent_name, task_input, llm, agent_process_queue): - BaseAgent.__init__(self, agent_name, task_input, llm, agent_process_queue) + def __init__(self, + agent_name, + task_input, + llm, + agent_process_queue, + log_mode + ): + BaseAgent.__init__(self, agent_name, task_input, llm, agent_process_queue, log_mode) def run(self): prompt = "" @@ -25,7 +27,7 @@ def run(self): prompt += prefix task_input = self.task_input task_input = "The task you need to solve is: " + task_input - logger.info(f"[{self.agent_name}] {task_input}\n") + self.logger.info(f"[{self.agent_name}] {task_input}\n") prompt += task_input waiting_times = [] turnaround_times = [] @@ -37,7 +39,7 @@ def run(self): for i, step in enumerate(steps): prompt += f"\nIn step {i+1}, you need to {step}. Output should focus on current step and don't be verbose!" - logger.info(f"[{self.agent_name}] Step {i+1}: {step}\n") + self.logger.info(f"[{self.agent_name}] Step {i+1}: {step}\n") response, waiting_time, turnaround_time = self.get_response(prompt) waiting_times.append(waiting_time) @@ -45,17 +47,17 @@ def run(self): prompt += f"The solution to step {i+1} is: {response}\n" - logger.info(f"[{self.agent_name}] The solution to step {i+1}: {response}\n") + self.logger.info(f"[{self.agent_name}] The solution to step {i+1}: {response}\n") prompt += f"Given the interaction history: '{prompt}', give a final recommendation list and explanations, don't be verbose!" - final_result, waiting_time, turnaround_time = self.get_final_result(prompt) + final_result, waiting_time, turnaround_time = self.get_response(prompt) # time.sleep(10) self.set_status("Done") # print(f"Average waiting time: {np.mean(np.array(waiting_times))}") - logger.info(f"{self.agent_name} has finished: average waiting time: {np.mean(np.array(waiting_times))} seconds, turnaround time: {np.mean(np.array(turnaround_times))} seconds\n") + self.logger.info(f"[{self.agent_name}] has finished: average waiting time: {np.mean(np.array(waiting_times))} seconds, turnaround time: {np.mean(np.array(turnaround_times))} seconds\n") - logger.info(f"[{self.agent_name}] {task_input} Final result is: {final_result}") + self.logger.info(f"[{self.agent_name}] {task_input} Final result is: {final_result}") return final_result @@ -69,4 +71,4 @@ def parse_result(self, prompt): args = parser.parse_args() agent = RecAgent(args.agent_name, args.task_input) - agent.run() \ No newline at end of file + agent.run() diff --git a/src/command_executor.py b/src/command_executor.py index e04877fa..5339fa4c 100644 --- a/src/command_executor.py +++ b/src/command_executor.py @@ -3,15 +3,15 @@ import sys from src.utils.global_param import ( - thread_pool, - agent_process_queue, agent_table ) +from src.agents.agent_factory import AgentFactory + import subprocess class Executor: - def __init__(self, agent_factory): + def __init__(self, agent_factory: AgentFactory): # self.thread_pool = ThreadPoolExecutor(max_workers=MAX_WORKER_NUM) self.command_table = { "run": self.run_agent, @@ -25,13 +25,13 @@ def execute(self, command): command_type = command["command_type"] command_name = command["command_name"] command_body = command["command_body"] - + if command_type in self.command_table.keys(): self.command_table[command_type]( command_name, command_body ) - + else: return NotImplementedError @@ -41,25 +41,11 @@ def print_agent_memory(): def print(self, command_name = None, command_body = None): """List status of agent process.""" if command_name == "agent": - self.agent_factory.print() + self.agent_factory.print_agent() elif command_name == "agent-process": - agent_process_queue.print() - - def exit_shell(self): - """Exits the simulated shell.""" - print("Exiting shell.") - exit() - + return NotImplementedError + def run_agent(self, agent_name = None, task_input = None): - agent_program = agent_table[agent_name] - print(agent_program) - args = ["--agent_name", agent_name, "--task_input", task_input] - subprocess.run(["python3", "-m", agent_program] + args) - - # agent = self.agent_factory.activate_agent(agent_name, task_input) - # agent.run() - # runner = thread_pool.submit(agent.run) + agent = self.agent_factory.activate_agent(agent_name, task_input) - # print(len(agent_process_queue)) - - # deactivator = thread_pool.submit(self.agent_factory.deactivate_agent, agent) + self.agent_factory.agent_thread_pool.submit(agent.run) diff --git a/src/scheduler/base.py b/src/scheduler/base.py index 50cb4b6a..98ec6f69 100644 --- a/src/scheduler/base.py +++ b/src/scheduler/base.py @@ -1,28 +1,53 @@ from queue import Queue, Empty from threading import Thread -from src.utils.utils import ( - logger -) +import logging from src.agents.agent_process import AgentProcess import time +from datetime import datetime + +import os class BaseScheduler: - def __init__(self, llm): + def __init__(self, llm, log_mode): self.active = False # start/stop the scheduler + self.log_mode = log_mode + self.logger = self.setup_logger() self.thread = Thread(target=self.run) self.llm = llm def run(self): pass - + def start(self): """start the scheduler""" self.active = True self.thread.start() + def setup_logger(self): + logger = logging.getLogger(f"FIFO Scheduler Logger") + # logger.setLevel(logging.INFO) # Set the minimum logging level + logger.disabled = True + date_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # Provide two log modes: console and file + + if self.log_mode == "console": + handler = logging.StreamHandler() + handler.setLevel(logging.INFO) # Set logging level for console output + else: + assert self.log_mode == "file" + log_dir = os.path.join(os.getcwd(), "logs", "scheduler") + if not os.path.exists(log_dir): + os.makedirs(log_dir) + log_file = os.path.join(log_dir, f"{date_time}.txt") + handler = logging.FileHandler(log_file) + handler.setLevel(logging.INFO) # Set logging + + logger.addHandler(handler) # enabled when run in a simulated shell + return logger + def stop(self): """stop the scheduler""" self.active = False @@ -30,7 +55,7 @@ def stop(self): def execute_request(self, agent_process: AgentProcess): agent_process.set_status("Executing") - logger.info(f"[{agent_process.agent_name}] is executing.") + self.logger.info(f"[{agent_process.agent_name}] is executing.") agent_process.set_start_time(time.time()) response = self.llm.address_request(agent_process.prompt) agent_process.set_response(response) diff --git a/src/scheduler/fifo_scheduler.py b/src/scheduler/fifo_scheduler.py index 263d03ff..b40ac71b 100644 --- a/src/scheduler/fifo_scheduler.py +++ b/src/scheduler/fifo_scheduler.py @@ -1,20 +1,13 @@ from src.scheduler.base import BaseScheduler -import queue - -import time - from queue import Queue, Empty from threading import Thread class FIFOScheduler(BaseScheduler): - def __init__(self, llm): + def __init__(self, llm, log_mode): + super().__init__(llm, log_mode) self.agent_process_queue = Queue() - # start/stop the scheduler - self.active = False - # thread - self.thread = Thread(target=self.run) - self.llm = llm + def run(self): while self.active: @@ -22,4 +15,4 @@ def run(self): agent_request = self.agent_process_queue.get(block=True, timeout=1) self.execute_request(agent_request) except Empty: - pass \ No newline at end of file + pass diff --git a/src/utils/global_param.py b/src/utils/global_param.py index 4b3483b4..666963c6 100644 --- a/src/utils/global_param.py +++ b/src/utils/global_param.py @@ -4,14 +4,16 @@ from src.agents.rec_agent.rec_agent import RecAgent -global MAX_AID +from src.agents.travel_agent.travel_agent import TravelAgent + +global MAX_AID MAX_AID = 256 global aid_pool aid_pool = [False for i in range(MAX_AID)] -global agent_pool -agent_pool = {} +global current_agents +current_agents = {} global agent_execution_path agent_execution_path = { @@ -25,5 +27,6 @@ agent_table = { "MathAgent": MathAgent, "NarrativeAgent": NarrativeAgent, - "RecAgent": RecAgent -} \ No newline at end of file + "RecAgent": RecAgent, + "TravelAgent": TravelAgent +} diff --git a/src/utils/utils.py b/src/utils/utils.py index 42417c4d..86d757bd 100644 --- a/src/utils/utils.py +++ b/src/utils/utils.py @@ -19,6 +19,8 @@ def parse_global_args(): parser.add_argument('--max_gpu_memory', type=json.loads, help="Max gpu memory allocated for the LLM") parser.add_argument('--eval_device', type=str, help="Evaluation device") parser.add_argument('--max_new_tokens', type=int, default=256, help="The maximum number of new tokens for generation") + parser.add_argument("--scheduler_log_mode", type=str, default="console", choices=["console", "file"]) + parser.add_argument("--agent_log_mode", type=str, default="console", choices=["console", "file"]) return parser @@ -26,7 +28,6 @@ def extract_before_parenthesis(s: str) -> str: match = re.search(r'^(.*?)\([^)]*\)', s) return match.group(1) if match else s - def get_from_dict_or_env( data: Dict[str, Any], key: str, env_key: str, default: Optional[str] = None ) -> str: @@ -48,4 +49,15 @@ def get_from_env(env_key: str, default: Optional[str] = None) -> str: f"Did not find {env_key}, please add an environment variable" f" `{env_key}` which contains it. " ) - \ No newline at end of file + +class Logger: + def __init__(self, log_mode) -> None: + self.log_mode = log_mode + + def log(self, info, path=None): + if self.log_mode == "console": + print(info) + else: + assert self.log_mode == "file" + with open(path, "w") as w: + w.write(info + "\n")