diff --git a/letta/agent.py b/letta/agent.py index 98e6f9ff1e..4060c3637e 100644 --- a/letta/agent.py +++ b/letta/agent.py @@ -1371,10 +1371,9 @@ def migrate_embedding(self, embedding_config: EmbeddingConfig): # TODO: recall memory raise NotImplementedError() - def attach_source(self, user: PydanticUser, source_id: str, source_manager: SourceManager, ms: MetadataStore): + def attach_source(self, user: PydanticUser, source_id: str, source_manager: SourceManager, ms: MetadataStore, page_size: Optional[int] = None): """Attach data with name `source_name` to the agent from source_connector.""" # TODO: eventually, adding a data source should just give access to the retriever the source table, rather than modifying archival memory - page_size = 100 passages = self.passage_manager.list_passages(actor=user, source_id=source_id, limit=page_size) for passage in passages: diff --git a/letta/data_sources/connectors.py b/letta/data_sources/connectors.py index d64901d0c3..eb6b61ed65 100644 --- a/letta/data_sources/connectors.py +++ b/letta/data_sources/connectors.py @@ -13,8 +13,6 @@ from letta.schemas.source import Source from letta.services.passage_manager import PassageManager from letta.services.source_manager import SourceManager -from letta.utils import create_uuid_from_string - class DataConnector: """ @@ -42,7 +40,7 @@ def generate_passages(self, file: FileMetadata, chunk_size: int = 1024) -> Itera """ -def load_data(connector: DataConnector, source: Source, passage_manager: PassageManager, source_manager: SourceManager, actor: "User", agent_id: Optional[str] = None): +def load_data(connector: DataConnector, source: Source, passage_manager: PassageManager, source_manager: SourceManager, actor: "User"): """Load data from a connector (generates file and passages) into a specified source_id, associated with a user_id.""" embedding_config = source.embedding_config @@ -79,7 +77,6 @@ def load_data(connector: DataConnector, source: Source, passage_manager: Passage continue passage = Passage( - id=create_uuid_from_string(f"{str(source.id)}_{passage_text}"), text=passage_text, file_id=file_metadata.id, source_id=source.id, diff --git a/letta/server/server.py b/letta/server/server.py index 31734e29d2..f9474ebe46 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -1584,24 +1584,24 @@ def load_file_to_source(self, source_id: str, file_path: str, job_id: str, actor from letta.data_sources.connectors import DirectoryConnector source = self.source_manager.get_source_by_id(source_id=source_id) + if source is None: + raise ValueError(f"Source {source_id} does not exist") connector = DirectoryConnector(input_files=[file_path]) num_passages, num_documents = self.load_data(user_id=source.created_by_id, source_name=source.name, connector=connector) - # update all agents who have this source attached - agent_ids = self.ms.list_attached_agents(source_id=source_id) - for agent_id in agent_ids: - agent = self.load_agent(agent_id=agent_id) - curr_passage_size = self.passage_manager.size(actor=actor, agent_id=agent_id, source_id=source_id) - agent.attach_source(user=actor, source_id=source_id, source_manager=self.source_manager, ms=self.ms) - new_passage_size = self.passage_manager.size(actor=actor, agent_id=agent_id, source_id=source_id) - assert new_passage_size > curr_passage_size - # update job status job.status = JobStatus.completed job.metadata_["num_passages"] = num_passages job.metadata_["num_documents"] = num_documents self.job_manager.update_job_by_id(job_id=job_id, job_update=JobUpdate(**job.model_dump()), actor=actor) + # update all agents who have this source attached + agent_ids = self.ms.list_attached_agents(source_id=source_id) + for agent_id in agent_ids: + agent = self.load_agent(agent_id=agent_id) + agent.attach_source(user=actor, source_id=source_id, source_manager=self.source_manager, ms=self.ms) + assert new_passage_size >= curr_passage_size # in case empty files are added + return job def load_data( @@ -1620,7 +1620,7 @@ def load_data( raise ValueError(f"Data source {source_name} does not exist for user {user_id}") # load data into the document store - passage_count, document_count = load_data(connector, source, self.passage_manager, self.source_manager, actor=user, agent_id=agent_id) + passage_count, document_count = load_data(connector, source, self.passage_manager, self.source_manager, actor=user) return passage_count, document_count def attach_source_to_agent(