diff --git a/majority_voting_example.py b/majority_voting_example.py new file mode 100644 index 000000000..a2ba372ac --- /dev/null +++ b/majority_voting_example.py @@ -0,0 +1,34 @@ +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms.structs.majority_voting import MajorityVoting +from dotenv import load_dotenv + +load_dotenv() + + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4o", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=3, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, +) + +swarm = MajorityVoting(agents=[agent, agent, agent]) + +swarm.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", +) diff --git a/pyproject.toml b/pyproject.toml index 1c6c6926e..cb82760de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.1.1" +version = "7.1.2" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/agents/openai_assistant.py b/swarms/agents/openai_assistant.py index 2a29e1bf7..d256a7683 100644 --- a/swarms/agents/openai_assistant.py +++ b/swarms/agents/openai_assistant.py @@ -1,3 +1,4 @@ +from concurrent.futures import ThreadPoolExecutor import json import os import subprocess @@ -316,3 +317,20 @@ def run(self, task: str, *args, **kwargs) -> str: def call(self, task: str, *args, **kwargs) -> str: """Alias for run() to maintain compatibility with different agent interfaces.""" return self.run(task, *args, **kwargs) + + def batch_run( + self, tasks: List[str], *args, **kwargs + ) -> List[Any]: + """Run a batch of tasks using the OpenAI Assistant.""" + return [self.run(task, *args, **kwargs) for task in tasks] + + def run_concurrently( + self, tasks: List[str], *args, **kwargs + ) -> List[Any]: + """Run a batch of tasks concurrently using the OpenAI Assistant.""" + with ThreadPoolExecutor( + max_workers=os.cpu_count() + ) as executor: + return list( + executor.map(self.run, tasks, *args, **kwargs) + ) diff --git a/swarms/cli/onboarding_process.py b/swarms/cli/onboarding_process.py index edac11681..e279d9e3b 100644 --- a/swarms/cli/onboarding_process.py +++ b/swarms/cli/onboarding_process.py @@ -6,7 +6,7 @@ from swarms.utils.loguru_logger import initialize_logger -from swarms.telemetry.capture_sys_data import ( +from swarms.telemetry.main import ( capture_system_data, log_agent_data, ) diff --git a/swarms/prompts/prompt.py b/swarms/prompts/prompt.py index b8628b20e..ca6ec625f 100644 --- a/swarms/prompts/prompt.py +++ b/swarms/prompts/prompt.py @@ -11,7 +11,7 @@ ) from pydantic.v1 import validator -from swarms.telemetry.capture_sys_data import ( +from swarms.telemetry.main import ( capture_system_data, log_agent_data, ) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 4c80f222f..5d88260c7 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -76,15 +76,7 @@ star_swarm, ) from swarms.structs.task import Task -from swarms.structs.utils import ( - detect_markdown, - distribute_tasks, - extract_key_from_json, - extract_tokens_from_text, - find_agent_by_id, - find_token_in_text, - parse_tasks, -) + __all__ = [ "Agent", @@ -107,13 +99,6 @@ "RoundRobinSwarm", "SequentialWorkflow", "Task", - "detect_markdown", - "distribute_tasks", - "extract_key_from_json", - "extract_tokens_from_text", - "find_agent_by_id", - "find_token_in_text", - "parse_tasks", "MixtureOfAgents", "GraphWorkflow", "Node", diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 1d932dc19..873d89c97 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -52,7 +52,7 @@ from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) -from swarms.telemetry.capture_sys_data import log_agent_data +from swarms.telemetry.main import log_agent_data from swarms.agents.agent_print import agent_print from swarms.utils.litellm_tokenizer import count_tokens diff --git a/swarms/structs/agent_memory_manager.py b/swarms/structs/agent_memory_manager.py deleted file mode 100644 index 2b52013d4..000000000 --- a/swarms/structs/agent_memory_manager.py +++ /dev/null @@ -1,417 +0,0 @@ -import json -import logging -import time -import uuid -from datetime import datetime -from typing import Any, Dict, List, Optional - -import yaml -from pydantic import BaseModel -from swarms.utils.litellm_tokenizer import count_tokens - -logger = logging.getLogger(__name__) - - -class MemoryMetadata(BaseModel): - """Metadata for memory entries""" - - timestamp: Optional[float] = time.time() - role: Optional[str] = None - agent_name: Optional[str] = None - session_id: Optional[str] = None - memory_type: Optional[str] = None # 'short_term' or 'long_term' - token_count: Optional[int] = None - message_id: Optional[str] = str(uuid.uuid4()) - - -class MemoryEntry(BaseModel): - """Single memory entry with content and metadata""" - - content: Optional[str] = None - metadata: Optional[MemoryMetadata] = None - - -class MemoryConfig(BaseModel): - """Configuration for memory manager""" - - max_short_term_tokens: Optional[int] = 4096 - max_entries: Optional[int] = None - system_messages_token_buffer: Optional[int] = 1000 - enable_long_term_memory: Optional[bool] = False - auto_archive: Optional[bool] = True - archive_threshold: Optional[float] = 0.8 # Archive when 80% full - - -class MemoryManager: - """ - Manages both short-term and long-term memory for an agent, handling token limits, - archival, and context retrieval. - - Args: - config (MemoryConfig): Configuration for memory management - tokenizer (Optional[Any]): Tokenizer to use for token counting - long_term_memory (Optional[Any]): Vector store or database for long-term storage - """ - - def __init__( - self, - config: MemoryConfig, - tokenizer: Optional[Any] = None, - long_term_memory: Optional[Any] = None, - ): - self.config = config - self.tokenizer = tokenizer - self.long_term_memory = long_term_memory - - # Initialize memories - self.short_term_memory: List[MemoryEntry] = [] - self.system_messages: List[MemoryEntry] = [] - - # Memory statistics - self.total_tokens_processed: int = 0 - self.archived_entries_count: int = 0 - - def create_memory_entry( - self, - content: str, - role: str, - agent_name: str, - session_id: str, - memory_type: str = "short_term", - ) -> MemoryEntry: - """Create a new memory entry with metadata""" - metadata = MemoryMetadata( - timestamp=time.time(), - role=role, - agent_name=agent_name, - session_id=session_id, - memory_type=memory_type, - token_count=count_tokens(content), - ) - return MemoryEntry(content=content, metadata=metadata) - - def add_memory( - self, - content: str, - role: str, - agent_name: str, - session_id: str, - is_system: bool = False, - ) -> None: - """Add a new memory entry to appropriate storage""" - entry = self.create_memory_entry( - content=content, - role=role, - agent_name=agent_name, - session_id=session_id, - memory_type="system" if is_system else "short_term", - ) - - if is_system: - self.system_messages.append(entry) - else: - self.short_term_memory.append(entry) - - # Check if archiving is needed - if self.should_archive(): - self.archive_old_memories() - - self.total_tokens_processed += entry.metadata.token_count - - def get_current_token_count(self) -> int: - """Get total tokens in short-term memory""" - return sum( - entry.metadata.token_count - for entry in self.short_term_memory - ) - - def get_system_messages_token_count(self) -> int: - """Get total tokens in system messages""" - return sum( - entry.metadata.token_count - for entry in self.system_messages - ) - - def should_archive(self) -> bool: - """Check if archiving is needed based on configuration""" - if not self.config.auto_archive: - return False - - current_usage = ( - self.get_current_token_count() - / self.config.max_short_term_tokens - ) - return current_usage >= self.config.archive_threshold - - def archive_old_memories(self) -> None: - """Move older memories to long-term storage""" - if not self.long_term_memory: - logger.warning( - "No long-term memory storage configured for archiving" - ) - return - - while self.should_archive(): - # Get oldest non-system message - if not self.short_term_memory: - break - - oldest_entry = self.short_term_memory.pop(0) - - # Store in long-term memory - self.store_in_long_term_memory(oldest_entry) - self.archived_entries_count += 1 - - def store_in_long_term_memory(self, entry: MemoryEntry) -> None: - """Store a memory entry in long-term memory""" - if self.long_term_memory is None: - logger.warning( - "Attempted to store in non-existent long-term memory" - ) - return - - try: - self.long_term_memory.add(str(entry.model_dump())) - except Exception as e: - logger.error(f"Error storing in long-term memory: {e}") - # Re-add to short-term if storage fails - self.short_term_memory.insert(0, entry) - - def get_relevant_context( - self, query: str, max_tokens: Optional[int] = None - ) -> str: - """ - Get relevant context from both memory types - - Args: - query (str): Query to match against memories - max_tokens (Optional[int]): Maximum tokens to return - - Returns: - str: Combined relevant context - """ - contexts = [] - - # Add system messages first - for entry in self.system_messages: - contexts.append(entry.content) - - # Add short-term memory - for entry in reversed(self.short_term_memory): - contexts.append(entry.content) - - # Query long-term memory if available - if self.long_term_memory is not None: - long_term_context = self.long_term_memory.query(query) - if long_term_context: - contexts.append(str(long_term_context)) - - # Combine and truncate if needed - combined = "\n".join(contexts) - if max_tokens: - combined = self.truncate_to_token_limit( - combined, max_tokens - ) - - return combined - - def truncate_to_token_limit( - self, text: str, max_tokens: int - ) -> str: - """Truncate text to fit within token limit""" - current_tokens = count_tokens(text) - - if current_tokens <= max_tokens: - return text - - # Truncate by splitting into sentences and rebuilding - sentences = text.split(". ") - result = [] - current_count = 0 - - for sentence in sentences: - sentence_tokens = count_tokens(sentence) - if current_count + sentence_tokens <= max_tokens: - result.append(sentence) - current_count += sentence_tokens - else: - break - - return ". ".join(result) - - def clear_short_term_memory( - self, preserve_system: bool = True - ) -> None: - """Clear short-term memory with option to preserve system messages""" - if not preserve_system: - self.system_messages.clear() - self.short_term_memory.clear() - logger.info( - "Cleared short-term memory" - + " (preserved system messages)" - if preserve_system - else "" - ) - - def get_memory_stats(self) -> Dict[str, Any]: - """Get detailed memory statistics""" - return { - "short_term_messages": len(self.short_term_memory), - "system_messages": len(self.system_messages), - "current_tokens": self.get_current_token_count(), - "system_tokens": self.get_system_messages_token_count(), - "max_tokens": self.config.max_short_term_tokens, - "token_usage_percent": round( - ( - self.get_current_token_count() - / self.config.max_short_term_tokens - ) - * 100, - 2, - ), - "has_long_term_memory": self.long_term_memory is not None, - "archived_entries": self.archived_entries_count, - "total_tokens_processed": self.total_tokens_processed, - } - - def save_memory_snapshot(self, file_path: str) -> None: - """Save current memory state to file""" - try: - data = { - "timestamp": datetime.now().isoformat(), - "config": self.config.model_dump(), - "system_messages": [ - entry.model_dump() - for entry in self.system_messages - ], - "short_term_memory": [ - entry.model_dump() - for entry in self.short_term_memory - ], - "stats": self.get_memory_stats(), - } - - with open(file_path, "w") as f: - if file_path.endswith(".yaml"): - yaml.dump(data, f) - else: - json.dump(data, f, indent=2) - - logger.info(f"Saved memory snapshot to {file_path}") - - except Exception as e: - logger.error(f"Error saving memory snapshot: {e}") - raise - - def load_memory_snapshot(self, file_path: str) -> None: - """Load memory state from file""" - try: - with open(file_path, "r") as f: - if file_path.endswith(".yaml"): - data = yaml.safe_load(f) - else: - data = json.load(f) - - self.config = MemoryConfig(**data["config"]) - self.system_messages = [ - MemoryEntry(**entry) - for entry in data["system_messages"] - ] - self.short_term_memory = [ - MemoryEntry(**entry) - for entry in data["short_term_memory"] - ] - - logger.info(f"Loaded memory snapshot from {file_path}") - - except Exception as e: - logger.error(f"Error loading memory snapshot: {e}") - raise - - def search_memories( - self, query: str, memory_type: str = "all" - ) -> List[MemoryEntry]: - """ - Search through memories of specified type - - Args: - query (str): Search query - memory_type (str): Type of memories to search ("short_term", "system", "long_term", or "all") - - Returns: - List[MemoryEntry]: Matching memory entries - """ - results = [] - - if memory_type in ["short_term", "all"]: - results.extend( - [ - entry - for entry in self.short_term_memory - if query.lower() in entry.content.lower() - ] - ) - - if memory_type in ["system", "all"]: - results.extend( - [ - entry - for entry in self.system_messages - if query.lower() in entry.content.lower() - ] - ) - - if ( - memory_type in ["long_term", "all"] - and self.long_term_memory is not None - ): - long_term_results = self.long_term_memory.query(query) - if long_term_results: - # Convert long-term results to MemoryEntry format - for result in long_term_results: - content = str(result) - metadata = MemoryMetadata( - timestamp=time.time(), - role="long_term", - agent_name="system", - session_id="long_term", - memory_type="long_term", - token_count=count_tokens(content), - ) - results.append( - MemoryEntry( - content=content, metadata=metadata - ) - ) - - return results - - def get_memory_by_timeframe( - self, start_time: float, end_time: float - ) -> List[MemoryEntry]: - """Get memories within a specific timeframe""" - return [ - entry - for entry in self.short_term_memory - if start_time <= entry.metadata.timestamp <= end_time - ] - - def export_memories( - self, file_path: str, format: str = "json" - ) -> None: - """Export memories to file in specified format""" - data = { - "system_messages": [ - entry.model_dump() for entry in self.system_messages - ], - "short_term_memory": [ - entry.model_dump() for entry in self.short_term_memory - ], - "stats": self.get_memory_stats(), - } - - with open(file_path, "w") as f: - if format == "yaml": - yaml.dump(data, f) - else: - json.dump(data, f, indent=2) diff --git a/swarms/structs/company.py b/swarms/structs/company.py deleted file mode 100644 index f7fb36b72..000000000 --- a/swarms/structs/company.py +++ /dev/null @@ -1,177 +0,0 @@ -from dataclasses import dataclass, field -from typing import Dict, List, Optional, Union - -from swarms.structs.agent import Agent -from swarms.structs.base_swarm import BaseSwarm -from swarms.utils.loguru_logger import initialize_logger - - -logger = initialize_logger("company-swarm") - - -@dataclass -class Company(BaseSwarm): - """ - Represents a company with a hierarchical organizational structure. - """ - - org_chart: List[List[Agent]] - shared_instructions: str = None - ceo: Optional[Agent] = None - agents: List[Agent] = field(default_factory=list) - agent_interactions: Dict[str, List[str]] = field( - default_factory=dict - ) - - def __post_init__(self): - self._parse_org_chart(self.org_chart) - - def add(self, agent: Agent) -> None: - """ - Adds an agent to the company. - - Args: - agent (Agent): The agent to be added. - - Raises: - ValueError: If an agent with the same ID already exists in the company. - """ - try: - if any( - existing_agent.id == agent.id - for existing_agent in self.agents - ): - raise ValueError( - f"Agent with id {agent.id} already exists in the" - " company." - ) - self.agents.append(agent) - - except Exception as error: - logger.error( - f"[ERROR][CLASS: Company][METHOD: add] {error}" - ) - raise error - - def get(self, agent_name: str) -> Agent: - """ - Retrieves an agent from the company by name. - - Args: - agent_name (str): The name of the agent to retrieve. - - Returns: - Agent: The retrieved agent. - - Raises: - ValueError: If an agent with the specified name does not exist in the company. - """ - try: - for agent in self.agents: - if agent.name == agent_name: - return agent - raise ValueError( - f"Agent with name {agent_name} does not exist in the" - " company." - ) - except Exception as error: - logger.error( - f"[ERROR][CLASS: Company][METHOD: get] {error}" - ) - raise error - - def remove(self, agent: Agent) -> None: - """ - Removes an agent from the company. - - Args: - agent (Agent): The agent to be removed. - """ - try: - self.agents.remove(agent) - except Exception as error: - logger.error( - f"[ERROR][CLASS: Company][METHOD: remove] {error}" - ) - raise error - - def _parse_org_chart( - self, org_chart: Union[List[Agent], List[List[Agent]]] - ) -> None: - """ - Parses the organization chart and adds agents to the company. - - Args: - org_chart (Union[List[Agent], List[List[Agent]]]): The organization chart - representing the hierarchy of agents. - - Raises: - ValueError: If more than one CEO is found in the org chart or if an invalid - agent is encountered. - """ - try: - for node in org_chart: - if isinstance(node, Agent): - if self.ceo: - raise ValueError("1 CEO is only allowed") - self.ceo = node - self.add(node) - - elif isinstance(node, list): - for agent in node: - if not isinstance(agent, Agent): - raise ValueError( - "Invalid agent in org chart" - ) - self.add(agent) - - for i, agent in enumerate(node): - if i == len(node) - 1: - continue - - for other_agent in node[i + 1]: - self.__init_task(agent, other_agent) - except Exception as error: - logger.error( - "[ERROR][CLASS: Company][METHOD: _parse_org_chart]" - f" {error}" - ) - raise error - - def _init_interaction( - self, - agent1: Agent, - agent2: Agent, - ) -> None: - """ - Initializes the interaction between two agents. - - Args: - agent1 (Agent): The first agent involved in the interaction. - agent2 (Agent): The second agent involved in the interaction. - - Returns: - None - """ - if agent1.ai_name not in self.agents_interactions: - self.agents_interactions[agent1.ai_name] = [] - self.agents_interactions[agent1.ai_name].append( - agent2.ai_name - ) - - def run(self): - """ - Run the company - """ - for ( - agent_name, - interaction_agents, - ) in self.agents_interactions.items(): - agent = self.get(agent_name) - for interaction_agent in interaction_agents: - task_description = ( - f"Task for {agent_name} to interact with" - f" {interaction_agent}" - ) - print(f"{task_description} is being executed") - agent.run(task_description) diff --git a/swarms/structs/hiearchical_swarm.py b/swarms/structs/hiearchical_swarm.py index 94ebbaced..6244d18a3 100644 --- a/swarms/structs/hiearchical_swarm.py +++ b/swarms/structs/hiearchical_swarm.py @@ -13,7 +13,6 @@ logger = initialize_logger(log_folder="hierarchical_swarm") - class HierarchicalOrder(BaseModel): agent_name: str = Field( ..., diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py index 18738aa0c..f8ee64cec 100644 --- a/swarms/structs/majority_voting.py +++ b/swarms/structs/majority_voting.py @@ -1,11 +1,16 @@ +import asyncio import concurrent.futures +import multiprocessing +import os import re from collections import Counter +from concurrent.futures import ThreadPoolExecutor from typing import Any, Callable, List, Optional from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation -from swarms.utils.file_processing import create_file +from swarms.structs.multi_agent_exec import run_agents_concurrently +from swarms.utils.formatter import formatter from swarms.utils.loguru_logger import initialize_logger logger = initialize_logger(log_folder="majority_voting") @@ -139,13 +144,17 @@ def __init__( description: str = "A majority voting system for agents", agents: List[Agent] = [], output_parser: Optional[Callable] = majority_voting, + consensus_agent: Optional[Agent] = None, autosave: bool = False, verbose: bool = False, *args, **kwargs, ): + self.name = name + self.description = description self.agents = agents self.output_parser = output_parser + self.consensus_agent = consensus_agent self.autosave = autosave self.verbose = verbose @@ -153,19 +162,17 @@ def __init__( time_enabled=True, *args, **kwargs ) - # If autosave is enabled, save the conversation to a file - if self.autosave: - create_file( - str(self.conversation), "majority_voting.json" - ) + self.initialize_majority_voting() + + def initialize_majority_voting(self): + + if self.agents is None: + raise ValueError("Agents list is empty") # Log the agents - logger.info("Initializing majority voting system") - # Length of agents - logger.info(f"Number of agents: {len(self.agents)}") - logger.info( - "Agents:" - f" {', '.join(agent.agent_name for agent in self.agents)}" + formatter.print_panel( + f"Initializing majority voting system\nNumber of agents: {len(self.agents)}\nAgents: {', '.join(agent.agent_name for agent in self.agents)}", + title="Majority Voting", ) def run(self, task: str, *args, **kwargs) -> List[Any]: @@ -181,29 +188,17 @@ def run(self, task: str, *args, **kwargs) -> List[Any]: List[Any]: The majority vote. """ - # Route to each agent - with concurrent.futures.ThreadPoolExecutor() as executor: - logger.info("Running agents concurrently") - - futures = [ - executor.submit(agent.run, task, *args) - for agent in self.agents - ] - results = [ - future.result() - for future in concurrent.futures.as_completed(futures) - ] + results = run_agents_concurrently( + self.agents, task, max_workers=os.cpu_count() + ) # Add responses to conversation and log them for agent, response in zip(self.agents, results): + response = ( response if isinstance(response, list) else [response] ) self.conversation.add(agent.agent_name, response) - logger.info( - f"[Agent][Name: {agent.agent_name}][Response:" - f" {response}]" - ) # Perform majority voting on the conversation responses = [ @@ -217,8 +212,87 @@ def run(self, task: str, *args, **kwargs) -> List[Any]: majority_vote = self.output_parser( responses, *args, **kwargs ) + elif self.consensus_agent is not None: + majority_vote = self.consensus_agent.run(responses) else: majority_vote = majority_voting(responses) # Return the majority vote return majority_vote + + def batch_run( + self, tasks: List[str], *args, **kwargs + ) -> List[Any]: + """ + Runs the majority voting system in batch mode. + + Args: + tasks (List[str]): List of tasks to be performed by the agents. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Returns: + List[Any]: List of majority votes for each task. + """ + return [self.run(task, *args, **kwargs) for task in tasks] + + def run_concurrently( + self, tasks: List[str], *args, **kwargs + ) -> List[Any]: + """ + Runs the majority voting system concurrently. + + Args: + tasks (List[str]): List of tasks to be performed by the agents. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Returns: + List[Any]: List of majority votes for each task. + """ + with ThreadPoolExecutor( + max_workers=os.cpu_count() + ) as executor: + futures = [ + executor.submit(self.run, task, *args, **kwargs) + for task in tasks + ] + return [ + future.result() + for future in concurrent.futures.as_completed(futures) + ] + + def run_concurrently_multiprocess( + self, tasks: List[str], *args, **kwargs + ) -> List[Any]: + """ + Runs the majority voting system concurrently using multiprocessing. + + Args: + tasks (List[str]): List of tasks to be performed by the agents. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Returns: + List[Any]: List of majority votes for each task. + """ + with multiprocessing.Pool(processes=os.cpu_count()) as pool: + return pool.map(self.run, tasks) + + async def run_async( + self, tasks: List[str], *args, **kwargs + ) -> List[Any]: + """ + Runs the majority voting system concurrently using asyncio. + + Args: + tasks (List[str]): List of tasks to be performed by the agents. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Returns: + List[Any]: List of majority votes for each task. + """ + return await asyncio.gather( + *[self.run(task, *args, **kwargs) for task in tasks] + ) diff --git a/swarms/structs/mixture_of_agents.py b/swarms/structs/mixture_of_agents.py index e91d565f9..5fcd419d5 100644 --- a/swarms/structs/mixture_of_agents.py +++ b/swarms/structs/mixture_of_agents.py @@ -5,7 +5,7 @@ from pydantic import BaseModel, Field from swarms.structs.agent import Agent -from swarms.telemetry.capture_sys_data import log_agent_data +from swarms.telemetry.main import log_agent_data from swarms.schemas.agent_step_schemas import ManySteps from swarms.prompts.ag_prompt import aggregator_system_prompt from swarms.utils.loguru_logger import initialize_logger diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index 1ee5add2f..884cd23c4 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -15,6 +15,13 @@ ) +@dataclass +class ResourceMetrics: + cpu_percent: float + memory_percent: float + active_threads: int + + def run_single_agent(agent: AgentType, task: str) -> Any: """Run a single agent synchronously""" return agent.run(task) @@ -79,7 +86,7 @@ def run_agents_concurrently( List of outputs from each agent """ # Optimize defaults based on system resources - cpu_cores = cpu_count() + cpu_cores = os.cpu_count() batch_size = batch_size or cpu_cores max_workers = max_workers or cpu_cores * 2 @@ -275,13 +282,6 @@ def run_agents_with_timeout( return results -@dataclass -class ResourceMetrics: - cpu_percent: float - memory_percent: float - active_threads: int - - def get_system_metrics() -> ResourceMetrics: """Get current system resource usage""" return ResourceMetrics( diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index 6be885bee..b281c93ee 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -16,7 +16,7 @@ from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) -from swarms.telemetry.capture_sys_data import log_agent_data +from swarms.telemetry.main import log_agent_data logger = initialize_logger(log_folder="rearrange") diff --git a/swarms/structs/spreadsheet_swarm.py b/swarms/structs/spreadsheet_swarm.py index bec809874..90bed7ebd 100644 --- a/swarms/structs/spreadsheet_swarm.py +++ b/swarms/structs/spreadsheet_swarm.py @@ -10,7 +10,7 @@ from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm -from swarms.telemetry.capture_sys_data import log_agent_data +from swarms.telemetry.main import log_agent_data from swarms.utils.file_processing import create_file_in_folder from swarms.utils.loguru_logger import initialize_logger diff --git a/swarms/structs/swarm_output_type.py b/swarms/structs/swarm_output_type.py index f2a85732c..15af50029 100644 --- a/swarms/structs/swarm_output_type.py +++ b/swarms/structs/swarm_output_type.py @@ -4,7 +4,7 @@ from pydantic import BaseModel, Field -class AgentResponde(BaseModel): +class AgentRespond(BaseModel): id: str = Field(default=uuid.uuid4().hex) timestamp: str = Field(default=time.time()) agent_position: int = Field(description="Agent in swarm position") @@ -18,6 +18,6 @@ class SwarmOutput(BaseModel): name: str = Field(description="Swarm name") description: str = Field(description="Swarm description") swarm_type: str = Field(description="Swarm type") - agent_outputs: List[AgentResponde] = Field( + agent_outputs: List[AgentRespond] = Field( description="List of agent responses" ) diff --git a/swarms/structs/utils.py b/swarms/structs/utils.py index 9ca3a8872..093745616 100644 --- a/swarms/structs/utils.py +++ b/swarms/structs/utils.py @@ -1,32 +1,7 @@ -import json -import re -from typing import Any, Dict, List, Optional - +from typing import List from swarms.structs.agent import Agent -# Helper functions for manager/corporate agents -def parse_tasks( - task: str = None, -) -> Dict[str, Any]: - """Parse tasks - - Args: - task (str, optional): _description_. Defaults to None. - - Returns: - Dict[str, Any]: _description_ - """ - tasks = {} - for line in task.split("\n"): - if line.startswith("") and line.endwith( - "" - ): - agent_id, task = line[10:-11].split("><") - tasks[agent_id] = task - return tasks - - def find_agent_by_id( agent_id: str = None, agents: List[Agent] = None, @@ -43,106 +18,51 @@ def find_agent_by_id( Returns: Agent: _description_ """ - for agent in agents: - if agent.id == agent_id: - if task: - return agent.run(task, *args, **kwargs) - else: - return agent - - return None - - -def distribute_tasks( - task: str = None, agents: List[Agent] = None, *args, **kwargs -): - """Distribute tasks to agents - - Args: - task (str, optional): _description_. Defaults to None. - agents (List[Agent], optional): _description_. Defaults to None. - """ - # Parse the task to extract tasks and agent id - tasks = parse_tasks(task) - - # Distribute tasks to agents - for agent_id, task in tasks.item(): - assigned_agent = find_agent_by_id(agent_id, agents) - if assigned_agent: - print(f"Assigning task {task} to agent {agent_id}") - output = assigned_agent.run(task, *args, **kwargs) - print(f"Output from agent {agent_id}: {output}") - else: - print( - f"No agent found with ID {agent_id}. Task '{task}' is" - " not assigned." - ) - - -def find_token_in_text(text: str, token: str = "") -> bool: - """ - Parse a block of text for a specific token. - - Args: - text (str): The text to parse. - token (str): The token to find. - - Returns: - bool: True if the token is found in the text, False otherwise. - """ - # Check if the token is in the text - if token in text: - return True - else: - return False - - -def extract_key_from_json( - json_response: str, key: str -) -> Optional[str]: - """ - Extract a specific key from a JSON response. - - Args: - json_response (str): The JSON response to parse. - key (str): The key to extract. - - Returns: - Optional[str]: The value of the key if it exists, None otherwise. - """ - response_dict = json.loads(json_response) - return response_dict.get(key) - - -def extract_tokens_from_text( - text: str, tokens: List[str] -) -> List[str]: - """ - Extract a list of tokens from a text response. + try: + print(f"Searching for agent with ID: {agent_id}") + for agent in agents: + if agent.id == agent_id: + print(f"Found agent with ID {agent_id}") + if task: + print(f"Running task: {task}") + return agent.run(task, *args, **kwargs) + else: + return agent + print(f"No agent found with ID {agent_id}") + return None + except Exception as e: + print(f"Error finding agent by ID: {str(e)}") + return None + + +def find_agent_by_name( + agent_name: str = None, + agents: List[Agent] = None, + task: str = None, + *args, + **kwargs, +) -> Agent: + """Find agent by name Args: - text (str): The text to parse. - tokens (List[str]): The tokens to extract. + agent_name (str): _description_ + agents (List[Agent]): _description_ Returns: - List[str]: The tokens that were found in the text. - """ - return [token for token in tokens if token in text] - - -def detect_markdown(text: str) -> bool: - """ - Checks if a string contains Markdown code enclosed in six backticks. - - Parameters - ---------- - text : str - The text to check. - - Returns - ------- - bool - True if the text contains Markdown code enclosed in six backticks, False otherwise. + Agent: _description_ """ - pattern = r"``````[\s\S]*?``````" - return bool(re.search(pattern, text)) + try: + print(f"Searching for agent with name: {agent_name}") + for agent in agents: + if agent.name == agent_name: + print(f"Found agent with name {agent_name}") + if task: + print(f"Running task: {task}") + return agent.run(task, *args, **kwargs) + else: + return agent + print(f"No agent found with name {agent_name}") + return None + except Exception as e: + print(f"Error finding agent by name: {str(e)}") + return None diff --git a/swarms/structs/workspace_manager.py b/swarms/structs/workspace_manager.py index cec3615d9..8840c8929 100644 --- a/swarms/structs/workspace_manager.py +++ b/swarms/structs/workspace_manager.py @@ -41,7 +41,7 @@ def _create_env_file(self, env_file_path: Path) -> None: env_file_path (Path): The path to the .env file. """ with env_file_path.open("w") as file: - file.write("WORKSPACE_DIR=agent_workspace\n") + file.write(f"WORKSPACE_DIR={self.workspace_dir}\n") logger.info( "Created a new .env file with default WORKSPACE_DIR." ) @@ -57,7 +57,7 @@ def _append_to_env_file(self, env_file_path: Path) -> None: content = file.read() if "WORKSPACE_DIR" not in content: file.seek(0, os.SEEK_END) - file.write("WORKSPACE_DIR=agent_workspace\n") + file.write(f"WORKSPACE_DIR={self.workspace_dir}\n") logger.info("Appended WORKSPACE_DIR to .env file.") def _get_workspace_dir( @@ -150,6 +150,8 @@ def run(self) -> None: try: # Check if .env file exists and create it if it doesn't env_file_path = Path(".env") + + # If the .env file doesn't exist, create it if not env_file_path.exists(): self._create_env_file(env_file_path) else: diff --git a/swarms/telemetry/__init__.py b/swarms/telemetry/__init__.py index a3c966dd2..9792f266e 100644 --- a/swarms/telemetry/__init__.py +++ b/swarms/telemetry/__init__.py @@ -1,19 +1,17 @@ -from swarms.telemetry.sys_info import ( +from swarms.telemetry.main import ( + generate_unique_identifier, + generate_user_id, get_cpu_info, + get_machine_id, get_os_version, get_package_mismatches, get_pip_version, get_python_version, get_ram_info, get_swarms_verison, - system_info, -) -from swarms.telemetry.user_utils import ( - generate_unique_identifier, - generate_user_id, - get_machine_id, get_system_info, get_user_device_data, + system_info, ) __all__ = [ diff --git a/swarms/telemetry/capture_sys_data.py b/swarms/telemetry/capture_sys_data.py deleted file mode 100644 index 62b079a3c..000000000 --- a/swarms/telemetry/capture_sys_data.py +++ /dev/null @@ -1,87 +0,0 @@ -import os -import platform -import socket -import psutil -import uuid -from typing import Dict -import requests - -from swarms.utils.loguru_logger import initialize_logger - -logger = initialize_logger(log_folder="capture_sys_data") - - -def capture_system_data() -> Dict[str, str]: - """ - Captures extensive system data including platform information, user ID, IP address, CPU count, - memory information, and other system details. - - Returns: - Dict[str, str]: A dictionary containing system data. - """ - try: - system_data = { - "platform": platform.system(), - "platform_version": platform.version(), - "platform_release": platform.release(), - "hostname": socket.gethostname(), - "ip_address": socket.gethostbyname(socket.gethostname()), - "cpu_count": psutil.cpu_count(logical=True), - "memory_total": f"{psutil.virtual_memory().total / (1024 ** 3):.2f} GB", - "memory_available": f"{psutil.virtual_memory().available / (1024 ** 3):.2f} GB", - "user_id": str(uuid.uuid4()), # Unique user identifier - "machine_type": platform.machine(), - "processor": platform.processor(), - "architecture": platform.architecture()[0], - } - - # Get external IP address - try: - system_data["external_ip"] = requests.get( - "https://api.ipify.org" - ).text - except Exception: - system_data["external_ip"] = "N/A" - - return system_data - except Exception as e: - logger.error("Failed to capture system data: {}", e) - return {} - - -def log_agent_data(data_dict: dict) -> dict | None: - """ - Silently logs agent data to the Swarms database with retry logic. - - Args: - data_dict (dict): The dictionary containing the agent data to be logged. - - Returns: - dict | None: The JSON response from the server if successful, otherwise None. - """ - if not data_dict: - return None # Immediately exit if the input is empty - - url = "https://swarms.world/api/get-agents/log-agents" - headers = { - "Content-Type": "application/json", - "Authorization": os.getenv("SWARMS_API_KEY"), - } - - try: - response = requests.post( - url, json=data_dict, headers=headers, timeout=10 - ) - if ( - response.ok and response.text.strip() - ): # Check if response is valid and non-empty - return ( - response.json() - ) # Parse and return the JSON response - except ( - requests.exceptions.RequestException, - requests.exceptions.JSONDecodeError, - ): - pass # Fail silently without any action - - return None # Return None if anything goes wrong diff --git a/swarms/telemetry/main.py b/swarms/telemetry/main.py new file mode 100644 index 000000000..53bfa884c --- /dev/null +++ b/swarms/telemetry/main.py @@ -0,0 +1,304 @@ +import hashlib +import os +import platform +import socket +import subprocess +import uuid +from typing import Dict + +import pkg_resources +import psutil +import requests +import toml + +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="capture_sys_data") + + +# Helper functions +def generate_user_id(): + """Generate user id + + Returns: + _type_: _description_ + """ + return str(uuid.uuid4()) + + +def get_machine_id(): + """Get machine id + + Returns: + _type_: _description_ + """ + raw_id = platform.node() + hashed_id = hashlib.sha256(raw_id.encode()).hexdigest() + return hashed_id + + +def get_system_info(): + """ + Gathers basic system information. + + Returns: + dict: A dictionary containing system-related information. + """ + info = { + "platform": platform.system(), + "platform_release": platform.release(), + "platform_version": platform.version(), + "architecture": platform.machine(), + "hostname": socket.gethostname(), + "ip_address": socket.gethostbyname(socket.gethostname()), + "mac_address": ":".join( + [ + f"{(uuid.getnode() >> elements) & 0xFF:02x}" + for elements in range(0, 2 * 6, 8) + ][::-1] + ), + "processor": platform.processor(), + "python_version": platform.python_version(), + "Misc": system_info(), + } + return info + + +def generate_unique_identifier(): + """Generate unique identifier + + Returns: + str: unique id + + """ + system_info = get_system_info() + unique_id = uuid.uuid5(uuid.NAMESPACE_DNS, str(system_info)) + return str(unique_id) + + +def get_local_ip(): + """Get local ip + + Returns: + str: local ip + + """ + return socket.gethostbyname(socket.gethostname()) + + +def get_user_device_data(): + data = { + "ID": generate_user_id(), + "Machine ID": get_machine_id(), + "System Info": get_system_info(), + "UniqueID": generate_unique_identifier(), + } + return data + + +def get_python_version(): + return platform.python_version() + + +def get_pip_version() -> str: + """Get pip version + + Returns: + str: The version of pip installed + """ + try: + pip_version = ( + subprocess.check_output(["pip", "--version"]) + .decode() + .split()[1] + ) + except Exception as e: + pip_version = str(e) + return pip_version + + +def get_swarms_verison() -> tuple[str, str]: + """Get swarms version from both command line and package + + Returns: + tuple[str, str]: A tuple containing (command line version, package version) + """ + try: + swarms_verison_cmd = ( + subprocess.check_output(["swarms", "--version"]) + .decode() + .split()[1] + ) + except Exception as e: + swarms_verison_cmd = str(e) + swarms_verison_pkg = pkg_resources.get_distribution( + "swarms" + ).version + swarms_verison = swarms_verison_cmd, swarms_verison_pkg + return swarms_verison + + +def get_os_version() -> str: + """Get operating system version + + Returns: + str: The operating system version and platform details + """ + return platform.platform() + + +def get_cpu_info() -> str: + """Get CPU information + + Returns: + str: The processor information + """ + return platform.processor() + + +def get_ram_info() -> str: + """Get RAM information + + Returns: + str: A formatted string containing total, used and free RAM in GB + """ + vm = psutil.virtual_memory() + used_ram_gb = vm.used / (1024**3) + free_ram_gb = vm.free / (1024**3) + total_ram_gb = vm.total / (1024**3) + return ( + f"{total_ram_gb:.2f} GB, used: {used_ram_gb:.2f}, free:" + f" {free_ram_gb:.2f}" + ) + + +def get_package_mismatches(file_path: str = "pyproject.toml") -> str: + """Get package version mismatches between pyproject.toml and installed packages + + Args: + file_path (str, optional): Path to pyproject.toml file. Defaults to "pyproject.toml". + + Returns: + str: A formatted string containing package version mismatches + """ + with open(file_path) as file: + pyproject = toml.load(file) + dependencies = pyproject["tool"]["poetry"]["dependencies"] + dev_dependencies = pyproject["tool"]["poetry"]["group"]["dev"][ + "dependencies" + ] + dependencies.update(dev_dependencies) + + installed_packages = { + pkg.key: pkg.version for pkg in pkg_resources.working_set + } + + mismatches = [] + for package, version_info in dependencies.items(): + if isinstance(version_info, dict): + version_info = version_info["version"] + installed_version = installed_packages.get(package) + if installed_version and version_info.startswith("^"): + expected_version = version_info[1:] + if not installed_version.startswith(expected_version): + mismatches.append( + f"\t {package}: Mismatch," + f" pyproject.toml={expected_version}," + f" pip={installed_version}" + ) + else: + mismatches.append(f"\t {package}: Not found in pip list") + + return "\n" + "\n".join(mismatches) + + +def system_info() -> dict[str, str]: + """Get system information including Python, pip, OS, CPU and RAM details + + Returns: + dict[str, str]: A dictionary containing system information + """ + return { + "Python Version": get_python_version(), + "Pip Version": get_pip_version(), + # "Swarms Version": swarms_verison, + "OS Version and Architecture": get_os_version(), + "CPU Info": get_cpu_info(), + "RAM Info": get_ram_info(), + } + + +def capture_system_data() -> Dict[str, str]: + """ + Captures extensive system data including platform information, user ID, IP address, CPU count, + memory information, and other system details. + + Returns: + Dict[str, str]: A dictionary containing system data. + """ + try: + system_data = { + "platform": platform.system(), + "platform_version": platform.version(), + "platform_release": platform.release(), + "hostname": socket.gethostname(), + "ip_address": socket.gethostbyname(socket.gethostname()), + "cpu_count": psutil.cpu_count(logical=True), + "memory_total": f"{psutil.virtual_memory().total / (1024 ** 3):.2f} GB", + "memory_available": f"{psutil.virtual_memory().available / (1024 ** 3):.2f} GB", + "user_id": str(uuid.uuid4()), # Unique user identifier + "machine_type": platform.machine(), + "processor": platform.processor(), + "architecture": platform.architecture()[0], + } + + # Get external IP address + try: + system_data["external_ip"] = requests.get( + "https://api.ipify.org" + ).text + except Exception: + system_data["external_ip"] = "N/A" + + return system_data + except Exception as e: + logger.error("Failed to capture system data: {}", e) + return {} + + +def log_agent_data(data_dict: dict) -> dict | None: + """ + Silently logs agent data to the Swarms database with retry logic. + + Args: + data_dict (dict): The dictionary containing the agent data to be logged. + + Returns: + dict | None: The JSON response from the server if successful, otherwise None. + """ + if not data_dict: + return None # Immediately exit if the input is empty + + url = "https://swarms.world/api/get-agents/log-agents" + headers = { + "Content-Type": "application/json", + "Authorization": os.getenv("SWARMS_API_KEY"), + } + + try: + response = requests.post( + url, json=data_dict, headers=headers, timeout=10 + ) + if ( + response.ok and response.text.strip() + ): # Check if response is valid and non-empty + return ( + response.json() + ) # Parse and return the JSON response + except ( + requests.exceptions.RequestException, + requests.exceptions.JSONDecodeError, + ): + pass # Fail silently without any action + + return None # Return None if anything goes wrong diff --git a/swarms/telemetry/sys_info.py b/swarms/telemetry/sys_info.py deleted file mode 100644 index 2739362f4..000000000 --- a/swarms/telemetry/sys_info.py +++ /dev/null @@ -1,138 +0,0 @@ -import platform -import subprocess - -import pkg_resources -import psutil -import toml - - -def get_python_version(): - return platform.python_version() - - -def get_pip_version() -> str: - """Get pip version - - Returns: - str: The version of pip installed - """ - try: - pip_version = ( - subprocess.check_output(["pip", "--version"]) - .decode() - .split()[1] - ) - except Exception as e: - pip_version = str(e) - return pip_version - - -def get_swarms_verison() -> tuple[str, str]: - """Get swarms version from both command line and package - - Returns: - tuple[str, str]: A tuple containing (command line version, package version) - """ - try: - swarms_verison_cmd = ( - subprocess.check_output(["swarms", "--version"]) - .decode() - .split()[1] - ) - except Exception as e: - swarms_verison_cmd = str(e) - swarms_verison_pkg = pkg_resources.get_distribution( - "swarms" - ).version - swarms_verison = swarms_verison_cmd, swarms_verison_pkg - return swarms_verison - - -def get_os_version() -> str: - """Get operating system version - - Returns: - str: The operating system version and platform details - """ - return platform.platform() - - -def get_cpu_info() -> str: - """Get CPU information - - Returns: - str: The processor information - """ - return platform.processor() - - -def get_ram_info() -> str: - """Get RAM information - - Returns: - str: A formatted string containing total, used and free RAM in GB - """ - vm = psutil.virtual_memory() - used_ram_gb = vm.used / (1024**3) - free_ram_gb = vm.free / (1024**3) - total_ram_gb = vm.total / (1024**3) - return ( - f"{total_ram_gb:.2f} GB, used: {used_ram_gb:.2f}, free:" - f" {free_ram_gb:.2f}" - ) - - -def get_package_mismatches(file_path: str = "pyproject.toml") -> str: - """Get package version mismatches between pyproject.toml and installed packages - - Args: - file_path (str, optional): Path to pyproject.toml file. Defaults to "pyproject.toml". - - Returns: - str: A formatted string containing package version mismatches - """ - with open(file_path) as file: - pyproject = toml.load(file) - dependencies = pyproject["tool"]["poetry"]["dependencies"] - dev_dependencies = pyproject["tool"]["poetry"]["group"]["dev"][ - "dependencies" - ] - dependencies.update(dev_dependencies) - - installed_packages = { - pkg.key: pkg.version for pkg in pkg_resources.working_set - } - - mismatches = [] - for package, version_info in dependencies.items(): - if isinstance(version_info, dict): - version_info = version_info["version"] - installed_version = installed_packages.get(package) - if installed_version and version_info.startswith("^"): - expected_version = version_info[1:] - if not installed_version.startswith(expected_version): - mismatches.append( - f"\t {package}: Mismatch," - f" pyproject.toml={expected_version}," - f" pip={installed_version}" - ) - else: - mismatches.append(f"\t {package}: Not found in pip list") - - return "\n" + "\n".join(mismatches) - - -def system_info() -> dict[str, str]: - """Get system information including Python, pip, OS, CPU and RAM details - - Returns: - dict[str, str]: A dictionary containing system information - """ - return { - "Python Version": get_python_version(), - "Pip Version": get_pip_version(), - # "Swarms Version": swarms_verison, - "OS Version and Architecture": get_os_version(), - "CPU Info": get_cpu_info(), - "RAM Info": get_ram_info(), - } diff --git a/swarms/telemetry/user_utils.py b/swarms/telemetry/user_utils.py deleted file mode 100644 index 9da52a4c8..000000000 --- a/swarms/telemetry/user_utils.py +++ /dev/null @@ -1,86 +0,0 @@ -import hashlib -import platform -import socket -import uuid - -from swarms.telemetry.sys_info import system_info - - -# Helper functions -def generate_user_id(): - """Generate user id - - Returns: - _type_: _description_ - """ - return str(uuid.uuid4()) - - -def get_machine_id(): - """Get machine id - - Returns: - _type_: _description_ - """ - raw_id = platform.node() - hashed_id = hashlib.sha256(raw_id.encode()).hexdigest() - return hashed_id - - -def get_system_info(): - """ - Gathers basic system information. - - Returns: - dict: A dictionary containing system-related information. - """ - info = { - "platform": platform.system(), - "platform_release": platform.release(), - "platform_version": platform.version(), - "architecture": platform.machine(), - "hostname": socket.gethostname(), - "ip_address": socket.gethostbyname(socket.gethostname()), - "mac_address": ":".join( - [ - f"{(uuid.getnode() >> elements) & 0xFF:02x}" - for elements in range(0, 2 * 6, 8) - ][::-1] - ), - "processor": platform.processor(), - "python_version": platform.python_version(), - "Misc": system_info(), - } - return info - - -def generate_unique_identifier(): - """Generate unique identifier - - Returns: - str: unique id - - """ - system_info = get_system_info() - unique_id = uuid.uuid5(uuid.NAMESPACE_DNS, str(system_info)) - return str(unique_id) - - -def get_local_ip(): - """Get local ip - - Returns: - str: local ip - - """ - return socket.gethostbyname(socket.gethostname()) - - -def get_user_device_data(): - data = { - "ID": generate_user_id(), - "Machine ID": get_machine_id(), - "System Info": get_system_info(), - "UniqueID": generate_unique_identifier(), - } - return data diff --git a/swarms/utils/__init__.py b/swarms/utils/__init__.py index 49cd47fd6..5b321ba3a 100644 --- a/swarms/utils/__init__.py +++ b/swarms/utils/__init__.py @@ -1,4 +1,3 @@ -from swarms.utils.class_args_wrapper import print_class_parameters from swarms.utils.data_to_text import ( csv_to_text, data_to_text, @@ -20,7 +19,6 @@ __all__ = [ - "print_class_parameters", "csv_to_text", "data_to_text", "json_to_text", diff --git a/swarms/utils/async_file_creation.py b/swarms/utils/async_file_creation.py deleted file mode 100644 index 6c35e95dd..000000000 --- a/swarms/utils/async_file_creation.py +++ /dev/null @@ -1,106 +0,0 @@ -# In order to accelerate the ops of creating files, we use the async file creation method. -import os -import asyncio -from aiofiles import open as aio_open -from typing import List - - -async def async_create_file(file_path: str, content: str) -> None: - """ - Asynchronously creates a file at the specified path and writes the given content to it. - - Args: - file_path (str): The path where the file will be created. - content (str): The content to be written to the file. - - Returns: - None - """ - async with aio_open(file_path, "w") as file: - await file.write(content) - - -async def create_multiple_files( - file_paths: List[str], contents: List[str] -) -> None: - """ - Asynchronously creates multiple files at the specified paths and writes the corresponding content to each file. - - Args: - file_paths (List[str]): A list of paths where the files will be created. - contents (List[str]): A list of content to be written to each file, corresponding to the file paths. - - Returns: - None - """ - tasks = [ - async_create_file(file_path, content) - for file_path, content in zip(file_paths, contents) - ] - await asyncio.gather(*tasks) - - -async def create_file_with_directory( - file_path: str, content: str -) -> None: - """ - Creates a file with the specified directory path and content. If the directory does not exist, it is created. - - Args: - file_path (str): The path of the file to be created, including the directory. - content (str): The content to be written to the file. - - Returns: - None - """ - directory = os.path.dirname(file_path) - if not os.path.exists(directory): - os.makedirs(directory) - - await async_create_file(file_path, content) - - -def sync_create_file(file_path: str, content: str) -> None: - """ - Synchronously creates a file at the specified path and writes the given content to it. - - Args: - file_path (str): The path where the file will be created. - content (str): The content to be written to the file. - - Returns: - None - """ - asyncio.run(async_create_file(file_path, content)) - - -def sync_create_multiple_files( - file_paths: List[str], contents: List[str] -) -> None: - """ - Synchronously creates multiple files at the specified paths and writes the corresponding content to each file. - - Args: - file_paths (List[str]): A list of paths where the files will be created. - contents (List[str]): A list of content to be written to each file, corresponding to the file paths. - - Returns: - None - """ - asyncio.run(create_multiple_files(file_paths, contents)) - - -def sync_create_file_with_directory( - file_path: str, content: str -) -> None: - """ - Synchronously creates a file with the specified directory path and content. If the directory does not exist, it is created. - - Args: - file_path (str): The path of the file to be created, including the directory. - content (str): The content to be written to the file. - - Returns: - None - """ - asyncio.run(create_file_with_directory(file_path, content)) diff --git a/swarms/utils/class_args_wrapper.py b/swarms/utils/class_args_wrapper.py deleted file mode 100644 index f24932cfc..000000000 --- a/swarms/utils/class_args_wrapper.py +++ /dev/null @@ -1,36 +0,0 @@ -import inspect - - -def print_class_parameters(cls, api_format: bool = False): - """ - Print the parameters of a class constructor. - - Parameters: - cls (type): The class to inspect. - - Example: - >>> print_class_parameters(Agent) - Parameter: x, Type: - Parameter: y, Type: - """ - try: - # Get the parameters of the class constructor - sig = inspect.signature(cls.__init__) - params = sig.parameters - - if api_format: - param_dict = {} - for name, param in params.items(): - if name == "self": - continue - param_dict[name] = str(param.annotation) - return param_dict - - # Print the parameters - for name, param in params.items(): - if name == "self": - continue - print(f"Parameter: {name}, Type: {param.annotation}") - - except Exception as e: - print(f"An error occurred while inspecting the class: {e}") diff --git a/swarms/utils/update_agent_system_prompts.py b/swarms/utils/update_agent_system_prompts.py deleted file mode 100644 index e6f82426e..000000000 --- a/swarms/utils/update_agent_system_prompts.py +++ /dev/null @@ -1,53 +0,0 @@ -import concurrent.futures -from typing import List, Union -from swarms.structs.agent import Agent - - -def update_system_prompts( - agents: List[Union[Agent, str]], - prompt: str, -) -> List[Agent]: - """ - Update system prompts for a list of agents concurrently. - - Args: - agents: List of Agent objects or strings to update - prompt: The prompt text to append to each agent's system prompt - - Returns: - List of updated Agent objects - """ - if not agents: - return agents - - def update_agent_prompt(agent: Union[Agent, str]) -> Agent: - # Convert string to Agent if needed - if isinstance(agent, str): - agent = Agent( - agent_name=agent, - system_prompt=prompt, # Initialize with the provided prompt - ) - else: - # Preserve existing prompt and append new one - existing_prompt = ( - agent.system_prompt if agent.system_prompt else "" - ) - agent.system_prompt = existing_prompt + "\n" + prompt - return agent - - # Use ThreadPoolExecutor for concurrent execution - max_workers = min(len(agents), 4) # Reasonable thread count - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers - ) as executor: - futures = [] - for agent in agents: - future = executor.submit(update_agent_prompt, agent) - futures.append(future) - - # Collect results as they complete - updated_agents = [] - for future in concurrent.futures.as_completed(futures): - updated_agents.append(future.result()) - - return updated_agents diff --git a/tests/telemetry/test_user_utils.py b/tests/telemetry/test_user_utils.py index 96f32378f..d1f724046 100644 --- a/tests/telemetry/test_user_utils.py +++ b/tests/telemetry/test_user_utils.py @@ -1,6 +1,6 @@ import uuid -from swarms.telemetry.user_utils import ( +from swarms.telemetry.main import ( generate_unique_identifier, generate_user_id, get_machine_id, diff --git a/trader_swarm.py b/trader_swarm.py new file mode 100644 index 000000000..e69de29bb