Skip to content

Commit

Permalink
don't limit # of passages created from a source
Browse files Browse the repository at this point in the history
  • Loading branch information
Mindy Long committed Dec 11, 2024
1 parent f1213a0 commit 57571b8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 16 deletions.
3 changes: 1 addition & 2 deletions letta/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,10 +1371,9 @@ def migrate_embedding(self, embedding_config: EmbeddingConfig):
# TODO: recall memory
raise NotImplementedError()

def attach_source(self, user: PydanticUser, source_id: str, source_manager: SourceManager, ms: MetadataStore):
def attach_source(self, user: PydanticUser, source_id: str, source_manager: SourceManager, ms: MetadataStore, page_size: Optional[int] = None):
"""Attach data with name `source_name` to the agent from source_connector."""
# TODO: eventually, adding a data source should just give access to the retriever the source table, rather than modifying archival memory
page_size = 100
passages = self.passage_manager.list_passages(actor=user, source_id=source_id, limit=page_size)

for passage in passages:
Expand Down
5 changes: 1 addition & 4 deletions letta/data_sources/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
from letta.schemas.source import Source
from letta.services.passage_manager import PassageManager
from letta.services.source_manager import SourceManager
from letta.utils import create_uuid_from_string


class DataConnector:
"""
Expand Down Expand Up @@ -42,7 +40,7 @@ def generate_passages(self, file: FileMetadata, chunk_size: int = 1024) -> Itera
"""


def load_data(connector: DataConnector, source: Source, passage_manager: PassageManager, source_manager: SourceManager, actor: "User", agent_id: Optional[str] = None):
def load_data(connector: DataConnector, source: Source, passage_manager: PassageManager, source_manager: SourceManager, actor: "User"):
"""Load data from a connector (generates file and passages) into a specified source_id, associated with a user_id."""
embedding_config = source.embedding_config

Expand Down Expand Up @@ -79,7 +77,6 @@ def load_data(connector: DataConnector, source: Source, passage_manager: Passage
continue

passage = Passage(
id=create_uuid_from_string(f"{str(source.id)}_{passage_text}"),
text=passage_text,
file_id=file_metadata.id,
source_id=source.id,
Expand Down
20 changes: 10 additions & 10 deletions letta/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1584,24 +1584,24 @@ def load_file_to_source(self, source_id: str, file_path: str, job_id: str, actor
from letta.data_sources.connectors import DirectoryConnector

source = self.source_manager.get_source_by_id(source_id=source_id)
if source is None:
raise ValueError(f"Source {source_id} does not exist")
connector = DirectoryConnector(input_files=[file_path])
num_passages, num_documents = self.load_data(user_id=source.created_by_id, source_name=source.name, connector=connector)

# update all agents who have this source attached
agent_ids = self.ms.list_attached_agents(source_id=source_id)
for agent_id in agent_ids:
agent = self.load_agent(agent_id=agent_id)
curr_passage_size = self.passage_manager.size(actor=actor, agent_id=agent_id, source_id=source_id)
agent.attach_source(user=actor, source_id=source_id, source_manager=self.source_manager, ms=self.ms)
new_passage_size = self.passage_manager.size(actor=actor, agent_id=agent_id, source_id=source_id)
assert new_passage_size > curr_passage_size

# update job status
job.status = JobStatus.completed
job.metadata_["num_passages"] = num_passages
job.metadata_["num_documents"] = num_documents
self.job_manager.update_job_by_id(job_id=job_id, job_update=JobUpdate(**job.model_dump()), actor=actor)

# update all agents who have this source attached
agent_ids = self.ms.list_attached_agents(source_id=source_id)
for agent_id in agent_ids:
agent = self.load_agent(agent_id=agent_id)
agent.attach_source(user=actor, source_id=source_id, source_manager=self.source_manager, ms=self.ms)
assert new_passage_size >= curr_passage_size # in case empty files are added

return job

def load_data(
Expand All @@ -1620,7 +1620,7 @@ def load_data(
raise ValueError(f"Data source {source_name} does not exist for user {user_id}")

# load data into the document store
passage_count, document_count = load_data(connector, source, self.passage_manager, self.source_manager, actor=user, agent_id=agent_id)
passage_count, document_count = load_data(connector, source, self.passage_manager, self.source_manager, actor=user)
return passage_count, document_count

def attach_source_to_agent(
Expand Down

0 comments on commit 57571b8

Please sign in to comment.