From 23b1f806569651a99b04fc4be3088e06ed69217b Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Wed, 14 Jan 2026 14:04:54 -0800 Subject: [PATCH 01/18] Generalize ingestion status to a string, and mark/skip failed ingestions --- src/typeagent/knowpro/conversation_base.py | 14 ++++++++--- src/typeagent/knowpro/interfaces_storage.py | 6 ++++- src/typeagent/storage/memory/provider.py | 15 +++++++++++- src/typeagent/storage/memory/semrefindex.py | 2 +- src/typeagent/storage/sqlite/provider.py | 27 +++++++++++++++++---- src/typeagent/storage/sqlite/schema.py | 3 ++- tools/ingest_email.py | 6 +++-- 7 files changed, 59 insertions(+), 14 deletions(-) diff --git a/src/typeagent/knowpro/conversation_base.py b/src/typeagent/knowpro/conversation_base.py index b2476cb..114de52 100644 --- a/src/typeagent/knowpro/conversation_base.py +++ b/src/typeagent/knowpro/conversation_base.py @@ -141,13 +141,21 @@ async def add_messages_with_indexing( Exception: Any error """ storage = await self.settings.get_storage_provider() + if source_ids: + if len(source_ids) != len(messages): + raise ValueError( + f"Length of source_ids {len(source_ids)} " + f"must match length of messages {len(messages)}" + ) + async with storage: + for source_id in source_ids: + storage.mark_source_ingested(source_id, "failed") async with storage: - # Mark source IDs as ingested before adding messages - # This way, if indexing fails, the rollback will also undo the marks + # Mark source IDs as ingested (will be rolled back on error) if source_ids: for source_id in source_ids: - storage.mark_source_ingested(source_id) + storage.mark_source_ingested(source_id, "ingested") start_points = IndexingStartPoints( message_count=await self.messages.size(), diff --git a/src/typeagent/knowpro/interfaces_storage.py b/src/typeagent/knowpro/interfaces_storage.py index e6d2070..0d2e7f6 100644 --- a/src/typeagent/knowpro/interfaces_storage.py +++ b/src/typeagent/knowpro/interfaces_storage.py @@ -150,7 +150,11 @@ def is_source_ingested(self, source_id: str) -> bool: """Check if a source has already been ingested.""" ... - def mark_source_ingested(self, source_id: str) -> None: + def get_source_status(self, source_id: str) -> str | None: + """Get the ingestion status of a source.""" + ... + + def mark_source_ingested(self, source_id: str, status: str = "ingested") -> None: """Mark a source as ingested (no commit; call within transaction context).""" ... diff --git a/src/typeagent/storage/memory/provider.py b/src/typeagent/storage/memory/provider.py index e80286e..0cee2ed 100644 --- a/src/typeagent/storage/memory/provider.py +++ b/src/typeagent/storage/memory/provider.py @@ -150,7 +150,20 @@ def is_source_ingested(self, source_id: str) -> bool: """ return source_id in self._ingested_sources - def mark_source_ingested(self, source_id: str) -> None: + def get_source_status(self, source_id: str) -> str | None: + """Get the ingestion status of a source. + + Args: + source_id: External source identifier (email ID, file path, etc.) + + Returns: + The ingestion status if the source has been ingested, None otherwise. + """ + if source_id in self._ingested_sources: + return "ingested" + return None + + def mark_source_ingested(self, source_id: str, status: str = "ingested") -> None: """Mark a source as ingested. Args: diff --git a/src/typeagent/storage/memory/semrefindex.py b/src/typeagent/storage/memory/semrefindex.py index 3aa650a..ec44c87 100644 --- a/src/typeagent/storage/memory/semrefindex.py +++ b/src/typeagent/storage/memory/semrefindex.py @@ -136,7 +136,7 @@ async def add_batch_to_semantic_ref_index_from_list[ for i, knowledge_result in enumerate(knowledge_results): if isinstance(knowledge_result, Failure): raise RuntimeError( - f"Knowledge extraction failed: {knowledge_result.message}" + f"Knowledge extraction failed: {knowledge_result.message:.150}" ) text_location = batch[i] knowledge = knowledge_result.value diff --git a/src/typeagent/storage/sqlite/provider.py b/src/typeagent/storage/sqlite/provider.py index ac4e52b..fc82a4d 100644 --- a/src/typeagent/storage/sqlite/provider.py +++ b/src/typeagent/storage/sqlite/provider.py @@ -625,11 +625,28 @@ def is_source_ingested(self, source_id: str) -> bool: """ cursor = self.db.cursor() cursor.execute( - "SELECT 1 FROM IngestedSources WHERE source_id = ?", (source_id,) + "SELECT status FROM IngestedSources WHERE source_id = ?", (source_id,) ) - return cursor.fetchone() is not None + row = cursor.fetchone() + return row is not None and row[0] == "ingested" + + def get_source_status(self, source_id: str) -> str | None: + """Get the ingestion status of a source. + + Args: + source_id: External source identifier (email ID, file path, etc.) + + Returns: + The status string if the source exists, or None if it hasn't been ingested. + """ + cursor = self.db.cursor() + cursor.execute( + "SELECT status FROM IngestedSources WHERE source_id = ?", (source_id,) + ) + row = cursor.fetchone() + return row[0] if row else None - def mark_source_ingested(self, source_id: str) -> None: + def mark_source_ingested(self, source_id: str, status: str = "ingested") -> None: """Mark a source as ingested. This performs an INSERT but does NOT commit. It should be called within @@ -641,6 +658,6 @@ def mark_source_ingested(self, source_id: str) -> None: """ cursor = self.db.cursor() cursor.execute( - "INSERT OR IGNORE INTO IngestedSources (source_id) VALUES (?)", - (source_id,), + "INSERT OR IGNORE INTO IngestedSources (source_id, status) VALUES (?, ?)", + (source_id, status), ) diff --git a/src/typeagent/storage/sqlite/schema.py b/src/typeagent/storage/sqlite/schema.py index 4ec25b8..f928333 100644 --- a/src/typeagent/storage/sqlite/schema.py +++ b/src/typeagent/storage/sqlite/schema.py @@ -143,7 +143,8 @@ # This prevents re-ingesting the same content on subsequent runs INGESTED_SOURCES_SCHEMA = """ CREATE TABLE IF NOT EXISTS IngestedSources ( - source_id TEXT PRIMARY KEY -- External source identifier (email ID, file path, etc.) + source_id TEXT PRIMARY KEY, -- External source identifier (email ID, file path, etc.) + status TEXT NOT NULL DEFAULT 'ingested' -- Status of the source (e.g., 'ingested', 'failed') ); """ diff --git a/tools/ingest_email.py b/tools/ingest_email.py index 34b65f7..350dae3 100644 --- a/tools/ingest_email.py +++ b/tools/ingest_email.py @@ -148,12 +148,14 @@ async def ingest_emails( email = import_email_from_file(str(email_file)) email_id = email.metadata.id + if verbose: + print(f" Imported email ID: {email_id}") # Check if this email was already ingested - if email_id and storage_provider.is_source_ingested(email_id): + if email_id and (status := storage_provider.get_source_status(email_id)): skipped_count += 1 if verbose: - print(f" [Already ingested, skipping]") + print(f" [Previously {status}, skipping]") continue if verbose: From 3547f19540c0f9e189987c4413e589f07db64910 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 10:03:03 -0800 Subject: [PATCH 02/18] Improve database speed (see #166) --- src/typeagent/aitools/utils.py | 8 +++++--- src/typeagent/aitools/vectorbase.py | 5 ++++- src/typeagent/knowpro/convknowledge.py | 5 +++++ src/typeagent/knowpro/fuzzyindex.py | 7 +++---- src/typeagent/storage/sqlite/messageindex.py | 2 +- src/typeagent/storage/sqlite/provider.py | 13 ++++++++----- src/typeagent/storage/sqlite/reltermsindex.py | 17 +++++++++++------ tests/test_utils.py | 2 +- tools/ingest_email.py | 16 ++++++++++++---- 9 files changed, 50 insertions(+), 25 deletions(-) diff --git a/src/typeagent/aitools/utils.py b/src/typeagent/aitools/utils.py index b1a263d..e66ab4c 100644 --- a/src/typeagent/aitools/utils.py +++ b/src/typeagent/aitools/utils.py @@ -22,15 +22,17 @@ def timelog(label: str, verbose: bool = True): """Context manager to log the time taken by a block of code. With verbose=False it prints nothing.""" + dim = colorama.Style.DIM + reset = colorama.Style.RESET_ALL + if verbose: + print(f"{dim}{label}...{reset}", end="", flush=True) start_time = time.time() try: yield finally: elapsed_time = time.time() - start_time if verbose: - dim = colorama.Style.DIM - reset = colorama.Style.RESET_ALL - print(f"{dim}{elapsed_time:.3f}s -- {label}{reset}") + print(f"{dim} {elapsed_time:.3f}s{reset}") def pretty_print(obj: object, prefix: str = "", suffix: str = "") -> None: diff --git a/src/typeagent/aitools/vectorbase.py b/src/typeagent/aitools/vectorbase.py index 1ea7ce9..96ec25e 100644 --- a/src/typeagent/aitools/vectorbase.py +++ b/src/typeagent/aitools/vectorbase.py @@ -93,10 +93,13 @@ def add_embedding( if key is not None: self._model.add_embedding(key, embedding) - def add_embeddings(self, embeddings: NormalizedEmbeddings) -> None: + def add_embeddings(self, keys: None | list[str], embeddings: NormalizedEmbeddings) -> None: assert embeddings.ndim == 2 assert embeddings.shape[1] == self._embedding_size self._vectors = np.concatenate((self._vectors, embeddings), axis=0) + if keys is not None: + for key, embedding in zip(keys, embeddings): + self._model.add_embedding(key, embedding) async def add_key(self, key: str, cache: bool = True) -> None: embeddings = (await self.get_embedding(key, cache=cache)).reshape(1, -1) diff --git a/src/typeagent/knowpro/convknowledge.py b/src/typeagent/knowpro/convknowledge.py index c445328..be882c4 100644 --- a/src/typeagent/knowpro/convknowledge.py +++ b/src/typeagent/knowpro/convknowledge.py @@ -13,6 +13,9 @@ # TODO: Move ModelWrapper and create_typechat_model() to aitools package. +# TODO: Make this a parameter that can be configured (e.g. from command line). +DEFAULT_TIMEOUT_SECONDS = 30 + class ModelWrapper(typechat.TypeChatLanguageModel): def __init__( self, @@ -34,6 +37,7 @@ async def complete( key_name = "AZURE_OPENAI_API_KEY" env[key_name] = api_key self.base_model = typechat.create_language_model(env) + self.base_model.timeout_seconds = DEFAULT_TIMEOUT_SECONDS return await self.base_model.complete(prompt) @@ -46,6 +50,7 @@ def create_typechat_model() -> typechat.TypeChatLanguageModel: shared_token_provider = auth.get_shared_token_provider() env[key_name] = shared_token_provider.get_token() model = typechat.create_language_model(env) + model.timeout_seconds = DEFAULT_TIMEOUT_SECONDS if shared_token_provider is not None: model = ModelWrapper(model, shared_token_provider) return model diff --git a/src/typeagent/knowpro/fuzzyindex.py b/src/typeagent/knowpro/fuzzyindex.py index 44bea04..6ace1b3 100644 --- a/src/typeagent/knowpro/fuzzyindex.py +++ b/src/typeagent/knowpro/fuzzyindex.py @@ -22,10 +22,9 @@ def __init__( # Use VectorBase for storage and operations on embeddings. self._vector_base = VectorBase(settings) - # Initialize with embeddings if provided. + # Add embeddings to vectorbase if provided. if embeddings is not None: - for embedding in embeddings: - self._vector_base.add_embedding(None, embedding) + self._vector_base.add_embeddings(None, embeddings) def __len__(self) -> int: return len(self._vector_base) @@ -43,7 +42,7 @@ def get(self, pos: int) -> NormalizedEmbedding: return self._vector_base.get_embedding_at(pos) def push(self, embeddings: NormalizedEmbeddings) -> None: - self._vector_base.add_embeddings(embeddings) + self._vector_base.add_embeddings(None, embeddings) async def add_texts(self, texts: list[str]) -> None: await self._vector_base.add_keys(texts) diff --git a/src/typeagent/storage/sqlite/messageindex.py b/src/typeagent/storage/sqlite/messageindex.py index f5cbd13..d4cc210 100644 --- a/src/typeagent/storage/sqlite/messageindex.py +++ b/src/typeagent/storage/sqlite/messageindex.py @@ -383,7 +383,7 @@ async def deserialize(self, data: interfaces.MessageTextIndexData) -> None: ) # Update VectorBase - self._vectorbase.add_embeddings(embeddings) + self._vectorbase.add_embeddings(None, embeddings) async def clear(self) -> None: """Clear the message text index.""" diff --git a/src/typeagent/storage/sqlite/provider.py b/src/typeagent/storage/sqlite/provider.py index fc82a4d..2b72bb6 100644 --- a/src/typeagent/storage/sqlite/provider.py +++ b/src/typeagent/storage/sqlite/provider.py @@ -6,6 +6,8 @@ from datetime import datetime, timezone import sqlite3 +from typeagent.aitools import utils + from ...aitools.embeddings import AsyncEmbeddingModel from ...aitools.vectorbase import TextEmbeddingIndexSettings from ...knowpro import interfaces @@ -87,11 +89,12 @@ def __init__( self._term_to_semantic_ref_index = SqliteTermToSemanticRefIndex(self.db) self._property_index = SqlitePropertyIndex(self.db) self._timestamp_index = SqliteTimestampToTextRangeIndex(self.db) - self._message_text_index = SqliteMessageTextIndex( - self.db, - self.message_text_index_settings, - self._message_collection, - ) + with utils.timelog("Initializing message text index"): + self._message_text_index = SqliteMessageTextIndex( + self.db, + self.message_text_index_settings, + self._message_collection, + ) # Initialize related terms index self._related_terms_index = SqliteRelatedTermsIndex( self.db, self.related_term_index_settings.embedding_index_settings diff --git a/src/typeagent/storage/sqlite/reltermsindex.py b/src/typeagent/storage/sqlite/reltermsindex.py index e56221a..5fe3dd6 100644 --- a/src/typeagent/storage/sqlite/reltermsindex.py +++ b/src/typeagent/storage/sqlite/reltermsindex.py @@ -5,7 +5,11 @@ import sqlite3 -from ...aitools.embeddings import NormalizedEmbeddings +import numpy as np + +from typeagent.aitools import utils + +from ...aitools.embeddings import NormalizedEmbedding from ...aitools.vectorbase import TextEmbeddingIndexSettings, VectorBase from ...knowpro import interfaces from .schema import deserialize_embedding, serialize_embedding @@ -145,13 +149,13 @@ def __init__(self, db: sqlite3.Connection, settings: TextEmbeddingIndexSettings) "SELECT term, term_embedding FROM RelatedTermsFuzzy ORDER BY term" ) rows = cursor.fetchall() + embeddings: list[NormalizedEmbedding] = [] for term, blob in rows: assert blob is not None, term - embedding: NormalizedEmbeddings = deserialize_embedding(blob) - # Add to VectorBase at the correct ordinal - self._vector_base.add_embedding(term, embedding) self._terms_list.append(term) - self._added_terms.add(term) + embeddings.append(deserialize_embedding(blob)) + # Bulk add embeddings to VectorBase + self._vector_base.add_embeddings(None, np.array(embeddings)) async def lookup_term( self, @@ -308,7 +312,8 @@ def __init__(self, db: sqlite3.Connection, settings: TextEmbeddingIndexSettings) self.db = db # Initialize alias and fuzzy related terms indexes self._aliases = SqliteRelatedTermsAliases(db) - self._fuzzy_index = SqliteRelatedTermsFuzzy(db, settings) + with utils.timelog("Initializing fuzzy related terms index"): + self._fuzzy_index = SqliteRelatedTermsFuzzy(db, settings) @property def aliases(self) -> interfaces.ITermToRelatedTerms: diff --git a/tests/test_utils.py b/tests/test_utils.py index 1a9f7d7..d8c6715 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -14,7 +14,7 @@ def test_timelog(): with utils.timelog("test block"): pass out = buf.getvalue() - assert "s -- test block" in out + assert "test block..." in out def test_pretty_print(): diff --git a/tools/ingest_email.py b/tools/ingest_email.py index 350dae3..8ff3eb4 100644 --- a/tools/ingest_email.py +++ b/tools/ingest_email.py @@ -14,6 +14,15 @@ python query.py --database email.db --query "What was discussed?" """ +""" +TODO + +- Find out why storage creation is slow when db is large (Issue #166) +- Use filename as source ID (so we skip parsing if already ingested) +- Catch auth errors and stop rather than marking as failed +- Collect knowledge outside db transaction to reduce lock time +""" + import argparse import asyncio from pathlib import Path @@ -74,7 +83,7 @@ def collect_email_files(paths: list[str], verbose: bool) -> list[Path]: elif path.is_dir(): eml_files = sorted(path.glob("*.eml")) if verbose: - print(f" Found {len(eml_files)} .eml files in {path}") + print(f"Found {len(eml_files)} .eml files in {path}") email_files.extend(eml_files) else: print(f"Error: Not a file or directory: {path}", file=sys.stderr) @@ -91,9 +100,8 @@ async def ingest_emails( """Ingest email files into a database.""" # Collect all .eml files - if verbose: - print("Collecting email files...") - email_files = collect_email_files(paths, verbose) + with utils.timelog("Collecting email files"): + email_files = collect_email_files(paths, verbose) if not email_files: print("Error: No .eml files found", file=sys.stderr) From eed216a5e0a83bf5f728c7d050828595536ab570 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 12:44:20 -0800 Subject: [PATCH 03/18] Add test for add_embeddings --- tests/test_vectorbase.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_vectorbase.py b/tests/test_vectorbase.py index 22e3440..62abd39 100644 --- a/tests/test_vectorbase.py +++ b/tests/test_vectorbase.py @@ -48,6 +48,26 @@ def test_add_embedding(vector_base: VectorBase, sample_embeddings: Samples): np.testing.assert_array_equal(vector_base.serialize_embedding_at(i), embedding) +def test_add_embeddings(vector_base: VectorBase, sample_embeddings: Samples): + """Adding multiple embeddings at once matches repeated single adds.""" + keys = list(sample_embeddings.keys()) + for key, embedding in sample_embeddings.items(): + vector_base.add_embedding(key, embedding) + + bulk_vector_base = make_vector_base() + stacked_embeddings = np.stack([sample_embeddings[key] for key in keys], axis=0) + bulk_vector_base.add_embeddings(keys, stacked_embeddings) + + assert len(bulk_vector_base) == len(vector_base) + np.testing.assert_array_equal(bulk_vector_base.serialize(), vector_base.serialize()) + + sequential_cache = vector_base._model._embedding_cache + bulk_cache = bulk_vector_base._model._embedding_cache + assert set(sequential_cache.keys()) == set(bulk_cache.keys()) + for key in keys: + np.testing.assert_array_equal(bulk_cache[key], sequential_cache[key]) + + @pytest.mark.asyncio async def test_add_key(vector_base: VectorBase, sample_embeddings: Samples): """Test adding keys to the VectorBase.""" From b3516e87fe0c30f10be8810ef49f4d55ca802de9 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 12:54:56 -0800 Subject: [PATCH 04/18] Send timelog to stderr, to avoid breaking MCP protocol --- src/typeagent/aitools/utils.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/typeagent/aitools/utils.py b/src/typeagent/aitools/utils.py index e66ab4c..c5c11e5 100644 --- a/src/typeagent/aitools/utils.py +++ b/src/typeagent/aitools/utils.py @@ -8,6 +8,7 @@ import os import re import shutil +import sys import time import black @@ -25,14 +26,23 @@ def timelog(label: str, verbose: bool = True): dim = colorama.Style.DIM reset = colorama.Style.RESET_ALL if verbose: - print(f"{dim}{label}...{reset}", end="", flush=True) + print( + f"{dim}{label}...{reset}", + end="", + flush=True, + file=sys.stderr, + ) start_time = time.time() try: yield finally: elapsed_time = time.time() - start_time if verbose: - print(f"{dim} {elapsed_time:.3f}s{reset}") + print( + f"{dim} {elapsed_time:.3f}s{reset}", + file=sys.stderr, + flush=True, + ) def pretty_print(obj: object, prefix: str = "", suffix: str = "") -> None: From 32f3abb78e570ab5169562161abbeffab168e2e8 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 12:56:08 -0800 Subject: [PATCH 05/18] Formatted vectorbase.py and convknowledge.py --- src/typeagent/aitools/vectorbase.py | 4 +++- src/typeagent/knowpro/convknowledge.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/typeagent/aitools/vectorbase.py b/src/typeagent/aitools/vectorbase.py index 96ec25e..3bbc572 100644 --- a/src/typeagent/aitools/vectorbase.py +++ b/src/typeagent/aitools/vectorbase.py @@ -93,7 +93,9 @@ def add_embedding( if key is not None: self._model.add_embedding(key, embedding) - def add_embeddings(self, keys: None | list[str], embeddings: NormalizedEmbeddings) -> None: + def add_embeddings( + self, keys: None | list[str], embeddings: NormalizedEmbeddings + ) -> None: assert embeddings.ndim == 2 assert embeddings.shape[1] == self._embedding_size self._vectors = np.concatenate((self._vectors, embeddings), axis=0) diff --git a/src/typeagent/knowpro/convknowledge.py b/src/typeagent/knowpro/convknowledge.py index be882c4..59f9adf 100644 --- a/src/typeagent/knowpro/convknowledge.py +++ b/src/typeagent/knowpro/convknowledge.py @@ -16,6 +16,7 @@ # TODO: Make this a parameter that can be configured (e.g. from command line). DEFAULT_TIMEOUT_SECONDS = 30 + class ModelWrapper(typechat.TypeChatLanguageModel): def __init__( self, From c0c39560c9b2409f94af17bd0f1335403e28cf9d Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 13:48:52 -0800 Subject: [PATCH 06/18] Fix test_timelog (capture stderr) --- tests/test_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index d8c6715..84bd6ee 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from contextlib import redirect_stdout +from contextlib import redirect_stderr, redirect_stdout from io import StringIO import os @@ -10,7 +10,7 @@ def test_timelog(): buf = StringIO() - with redirect_stdout(buf): + with redirect_stderr(buf): with utils.timelog("test block"): pass out = buf.getvalue() From 8a4ee7fdd37c26cda2e365034cbdc54531cb3751 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 13:51:08 -0800 Subject: [PATCH 07/18] Allow updating existing source_id status --- src/typeagent/storage/sqlite/provider.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/typeagent/storage/sqlite/provider.py b/src/typeagent/storage/sqlite/provider.py index 2b72bb6..26e8b34 100644 --- a/src/typeagent/storage/sqlite/provider.py +++ b/src/typeagent/storage/sqlite/provider.py @@ -58,6 +58,7 @@ def __init__( self.db = sqlite3.connect(db_path) # Configure SQLite for optimal bulk insertion performance + # TODO: Move into init_db_schema() self.db.execute("PRAGMA foreign_keys = ON") # Improve write performance for bulk operations self.db.execute("PRAGMA synchronous = NORMAL") # Faster than FULL, still safe @@ -661,6 +662,6 @@ def mark_source_ingested(self, source_id: str, status: str = "ingested") -> None """ cursor = self.db.cursor() cursor.execute( - "INSERT OR IGNORE INTO IngestedSources (source_id, status) VALUES (?, ?)", + "INSERT OR REPLACE INTO IngestedSources (source_id, status) VALUES (?, ?)", (source_id, status), ) From 1cff9c08cbdd188f1e273b4aba9ea8ca1e086eed Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 13:54:49 -0800 Subject: [PATCH 08/18] Remove quadratic code in SqliteMessageTextIndex --- src/typeagent/storage/sqlite/messageindex.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/typeagent/storage/sqlite/messageindex.py b/src/typeagent/storage/sqlite/messageindex.py index d4cc210..4245169 100644 --- a/src/typeagent/storage/sqlite/messageindex.py +++ b/src/typeagent/storage/sqlite/messageindex.py @@ -34,8 +34,11 @@ def __init__( if self._size(): cursor = self.db.cursor() cursor.execute("SELECT embedding FROM MessageTextIndex") - for row in cursor.fetchall(): - self._vectorbase.add_embedding(None, deserialize_embedding(row[0])) + rows = cursor.fetchall() + if rows: + embeddings: list[NormalizedEmbedding] = [deserialize_embedding(row[0]) for row in rows] + embeddings_array = np.stack(embeddings, axis=0).astype(np.float32, copy=False) + self._vectorbase.add_embeddings(None, embeddings_array) async def size(self) -> int: return self._size() From 3524065ed12d81b8e2e3114f2f88ed94d3088cf1 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 13:55:20 -0800 Subject: [PATCH 09/18] Remove unnecessary timelog calls --- src/typeagent/storage/sqlite/provider.py | 11 +++++------ src/typeagent/storage/sqlite/reltermsindex.py | 3 +-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/typeagent/storage/sqlite/provider.py b/src/typeagent/storage/sqlite/provider.py index 26e8b34..125da29 100644 --- a/src/typeagent/storage/sqlite/provider.py +++ b/src/typeagent/storage/sqlite/provider.py @@ -90,12 +90,11 @@ def __init__( self._term_to_semantic_ref_index = SqliteTermToSemanticRefIndex(self.db) self._property_index = SqlitePropertyIndex(self.db) self._timestamp_index = SqliteTimestampToTextRangeIndex(self.db) - with utils.timelog("Initializing message text index"): - self._message_text_index = SqliteMessageTextIndex( - self.db, - self.message_text_index_settings, - self._message_collection, - ) + self._message_text_index = SqliteMessageTextIndex( + self.db, + self.message_text_index_settings, + self._message_collection, + ) # Initialize related terms index self._related_terms_index = SqliteRelatedTermsIndex( self.db, self.related_term_index_settings.embedding_index_settings diff --git a/src/typeagent/storage/sqlite/reltermsindex.py b/src/typeagent/storage/sqlite/reltermsindex.py index 5fe3dd6..4fa15fb 100644 --- a/src/typeagent/storage/sqlite/reltermsindex.py +++ b/src/typeagent/storage/sqlite/reltermsindex.py @@ -312,8 +312,7 @@ def __init__(self, db: sqlite3.Connection, settings: TextEmbeddingIndexSettings) self.db = db # Initialize alias and fuzzy related terms indexes self._aliases = SqliteRelatedTermsAliases(db) - with utils.timelog("Initializing fuzzy related terms index"): - self._fuzzy_index = SqliteRelatedTermsFuzzy(db, settings) + self._fuzzy_index = SqliteRelatedTermsFuzzy(db, settings) @property def aliases(self) -> interfaces.ITermToRelatedTerms: From 4e9604c6d20ae0bd22907784bfc29d3ad4e2721f Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 13:57:05 -0800 Subject: [PATCH 10/18] Format messageindex.py --- src/typeagent/storage/sqlite/messageindex.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/typeagent/storage/sqlite/messageindex.py b/src/typeagent/storage/sqlite/messageindex.py index 4245169..8b7afd5 100644 --- a/src/typeagent/storage/sqlite/messageindex.py +++ b/src/typeagent/storage/sqlite/messageindex.py @@ -36,8 +36,12 @@ def __init__( cursor.execute("SELECT embedding FROM MessageTextIndex") rows = cursor.fetchall() if rows: - embeddings: list[NormalizedEmbedding] = [deserialize_embedding(row[0]) for row in rows] - embeddings_array = np.stack(embeddings, axis=0).astype(np.float32, copy=False) + embeddings: list[NormalizedEmbedding] = [ + deserialize_embedding(row[0]) for row in rows + ] + embeddings_array = np.stack(embeddings, axis=0).astype( + np.float32, copy=False + ) self._vectorbase.add_embeddings(None, embeddings_array) async def size(self) -> int: From 7e6c341a56637695cec0ca43f8d7dcb17e99a75c Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 13:57:30 -0800 Subject: [PATCH 11/18] Cosmetic (output) changes to ingest_email.py --- tools/ingest_email.py | 67 +++++++++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/tools/ingest_email.py b/tools/ingest_email.py index 8ff3eb4..5646409 100644 --- a/tools/ingest_email.py +++ b/tools/ingest_email.py @@ -25,6 +25,7 @@ import argparse import asyncio +from email.header import decode_header from pathlib import Path import sys import time @@ -92,6 +93,20 @@ def collect_email_files(paths: list[str], verbose: bool) -> list[Path]: return email_files +def decode_encoded_word(s: str) -> str: + """Decode an RFC 2047 encoded string.""" + if "=?utf-8?" not in s: + return s # Fast path for common case + decoded_parts = decode_header(s) + decoded_string = "" + for part, encoding in decoded_parts: + if isinstance(part, bytes): + decoded_string += part.decode(encoding or "utf-8", errors="replace") + else: + decoded_string += part + return decoded_string + + async def ingest_emails( paths: list[str], database: str, @@ -108,7 +123,7 @@ async def ingest_emails( sys.exit(1) if verbose: - print(f"Found {len(email_files)} email files to ingest") + print(f"Found {len(email_files)} email files in total to ingest") # Load environment for model API access if verbose: @@ -152,47 +167,63 @@ async def ingest_emails( for i, email_file in enumerate(email_files): try: if verbose: - print(f"\n[{i + 1}/{len(email_files)}] {email_file}") + print(f"[{i + 1}/{len(email_files)}] {email_file}", end="", flush=True) + if status := storage_provider.get_source_status(str(email_file)): + skipped_count += 1 + if verbose: + print(f" [Previously {status}, skipping]") + continue + else: + if verbose: + print() email = import_email_from_file(str(email_file)) - email_id = email.metadata.id + source_id = email.metadata.id if verbose: - print(f" Imported email ID: {email_id}") + print(f" Email ID: {source_id}", end="") # Check if this email was already ingested - if email_id and (status := storage_provider.get_source_status(email_id)): + if source_id and (status := storage_provider.get_source_status(source_id)): skipped_count += 1 if verbose: - print(f" [Previously {status}, skipping]") + print(f" [Previously {status}, skipping]") + async with storage_provider: + storage_provider.mark_source_ingested(str(email_file), status) continue + else: + if verbose: + print() if verbose: print(f" From: {email.metadata.sender}") if email.metadata.subject: - print(f" Subject: {email.metadata.subject}") + print( + f" Subject: {decode_encoded_word(email.metadata.subject).replace('\n', '\\n')}" + ) print(f" Date: {email.timestamp}") print(f" Body chunks: {len(email.text_chunks)}") for chunk in email.text_chunks: - # Show first 200 chars of each chunk - preview = chunk[:200].replace("\n", " ") - if len(chunk) > 200: - preview += "..." + # Show first N chars of each decoded chunk + N = 150 + chunk = decode_encoded_word(chunk) + preview = repr(chunk[: N + 1])[1:-1] + if len(preview) > N: + preview = preview[: N - 3] + "..." print(f" {preview}") # Pass source_id to mark as ingested atomically with the message - source_ids = [email_id] if email_id else None await email_memory.add_messages_with_indexing( - [email], source_ids=source_ids - ) + [email], source_ids=[str(email_file)] + ) # This may raise, esp. if the knowledge extraction fails (see except below) successful_count += 1 # Print progress periodically - if not verbose and (i + 1) % batch_size == 0: + if (i + 1) % batch_size == 0: elapsed = time.time() - start_time semref_count = await semref_coll.size() print( - f" [{i + 1}/{len(email_files)}] {successful_count} imported | " - f"{semref_count} refs | {elapsed:.1f}s elapsed" + f"\n[{i + 1}/{len(email_files)}] {successful_count} imported | " + f"{semref_count} refs | {elapsed:.1f}s elapsed\n" ) except Exception as e: @@ -201,7 +232,7 @@ async def ingest_emails( if verbose: import traceback - traceback.print_exc() + traceback.print_exc(limit=10) # Final summary elapsed = time.time() - start_time From 1c9810af7d483c2f0d36ffb67958612eec638d1f Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 14:11:09 -0800 Subject: [PATCH 12/18] Update TODO list in ingest_email.py --- tools/ingest_email.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/ingest_email.py b/tools/ingest_email.py index 5646409..e3aa579 100644 --- a/tools/ingest_email.py +++ b/tools/ingest_email.py @@ -17,8 +17,6 @@ """ TODO -- Find out why storage creation is slow when db is large (Issue #166) -- Use filename as source ID (so we skip parsing if already ingested) - Catch auth errors and stop rather than marking as failed - Collect knowledge outside db transaction to reduce lock time """ From 35bf3bfe1e79faaca9741d567440c225d0fbe9f5 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 14:16:10 -0800 Subject: [PATCH 13/18] Remove unused imports --- src/typeagent/storage/sqlite/provider.py | 2 -- src/typeagent/storage/sqlite/reltermsindex.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/typeagent/storage/sqlite/provider.py b/src/typeagent/storage/sqlite/provider.py index 125da29..8ec0dd8 100644 --- a/src/typeagent/storage/sqlite/provider.py +++ b/src/typeagent/storage/sqlite/provider.py @@ -6,8 +6,6 @@ from datetime import datetime, timezone import sqlite3 -from typeagent.aitools import utils - from ...aitools.embeddings import AsyncEmbeddingModel from ...aitools.vectorbase import TextEmbeddingIndexSettings from ...knowpro import interfaces diff --git a/src/typeagent/storage/sqlite/reltermsindex.py b/src/typeagent/storage/sqlite/reltermsindex.py index 4fa15fb..dec29db 100644 --- a/src/typeagent/storage/sqlite/reltermsindex.py +++ b/src/typeagent/storage/sqlite/reltermsindex.py @@ -7,8 +7,6 @@ import numpy as np -from typeagent.aitools import utils - from ...aitools.embeddings import NormalizedEmbedding from ...aitools.vectorbase import TextEmbeddingIndexSettings, VectorBase from ...knowpro import interfaces From ecf45b5d19436eeaffdf5fae2b0719cc6df3e8eb Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 18:07:07 -0800 Subject: [PATCH 14/18] Optimize decode_encoded_word() using join() --- tools/ingest_email.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tools/ingest_email.py b/tools/ingest_email.py index e3aa579..f2b620a 100644 --- a/tools/ingest_email.py +++ b/tools/ingest_email.py @@ -96,13 +96,14 @@ def decode_encoded_word(s: str) -> str: if "=?utf-8?" not in s: return s # Fast path for common case decoded_parts = decode_header(s) - decoded_string = "" - for part, encoding in decoded_parts: - if isinstance(part, bytes): - decoded_string += part.decode(encoding or "utf-8", errors="replace") - else: - decoded_string += part - return decoded_string + return "".join( + ( + part.decode(encoding or "utf-8", errors="replace") + if isinstance(part, bytes) + else part + ) + for part, encoding in decoded_parts + ) async def ingest_emails( From 96f4fc1869d0a5b5585c0bfa6dca0e05800c0f03 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 20:42:45 -0800 Subject: [PATCH 15/18] add_messages_with_indexing() should never mark source ids as failures --- src/typeagent/knowpro/conversation_base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/typeagent/knowpro/conversation_base.py b/src/typeagent/knowpro/conversation_base.py index 114de52..3a50c0c 100644 --- a/src/typeagent/knowpro/conversation_base.py +++ b/src/typeagent/knowpro/conversation_base.py @@ -147,9 +147,6 @@ async def add_messages_with_indexing( f"Length of source_ids {len(source_ids)} " f"must match length of messages {len(messages)}" ) - async with storage: - for source_id in source_ids: - storage.mark_source_ingested(source_id, "failed") async with storage: # Mark source IDs as ingested (will be rolled back on error) From 85318b9b75d89004abece029d207d2912d51b685 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 20:43:33 -0800 Subject: [PATCH 16/18] Upon Exception, set file status to exception name --- tools/ingest_email.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tools/ingest_email.py b/tools/ingest_email.py index f2b620a..ef4b623 100644 --- a/tools/ingest_email.py +++ b/tools/ingest_email.py @@ -228,6 +228,13 @@ async def ingest_emails( except Exception as e: failed_count += 1 print(f"Error processing {email_file}: {e}", file=sys.stderr) + exc_name = ( + e.__class__.__qualname__ + if e.__class__.__module__ == "builtins" + else f"{e.__class__.__module__}.{e.__class__.__qualname__}" + ) + async with storage_provider: + storage_provider.mark_source_ingested(str(email_file), exc_name) if verbose: import traceback From 9d006bfe2ebe91ecf5bbf4c451bc80537656790f Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 20:57:49 -0800 Subject: [PATCH 17/18] Make 'ingested' into STATUS_INGESTED constant --- src/typeagent/knowpro/conversation_base.py | 2 +- src/typeagent/knowpro/interfaces_storage.py | 6 +++++- src/typeagent/storage/memory/provider.py | 8 ++++++-- src/typeagent/storage/sqlite/provider.py | 8 ++++++-- src/typeagent/storage/sqlite/schema.py | 6 ++++-- src/typeagent/storage/utils.py | 2 ++ 6 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/typeagent/knowpro/conversation_base.py b/src/typeagent/knowpro/conversation_base.py index 3a50c0c..025b050 100644 --- a/src/typeagent/knowpro/conversation_base.py +++ b/src/typeagent/knowpro/conversation_base.py @@ -152,7 +152,7 @@ async def add_messages_with_indexing( # Mark source IDs as ingested (will be rolled back on error) if source_ids: for source_id in source_ids: - storage.mark_source_ingested(source_id, "ingested") + storage.mark_source_ingested(source_id) start_points = IndexingStartPoints( message_count=await self.messages.size(), diff --git a/src/typeagent/knowpro/interfaces_storage.py b/src/typeagent/knowpro/interfaces_storage.py index 0d2e7f6..3c2faf4 100644 --- a/src/typeagent/knowpro/interfaces_storage.py +++ b/src/typeagent/knowpro/interfaces_storage.py @@ -10,6 +10,8 @@ from pydantic.dataclasses import dataclass +from typeagent.storage.utils import STATUS_INGESTED + from .interfaces_core import ( IMessage, ITermToSemanticRefIndex, @@ -154,7 +156,9 @@ def get_source_status(self, source_id: str) -> str | None: """Get the ingestion status of a source.""" ... - def mark_source_ingested(self, source_id: str, status: str = "ingested") -> None: + def mark_source_ingested( + self, source_id: str, status: str = STATUS_INGESTED + ) -> None: """Mark a source as ingested (no commit; call within transaction context).""" ... diff --git a/src/typeagent/storage/memory/provider.py b/src/typeagent/storage/memory/provider.py index 0cee2ed..4daf02e 100644 --- a/src/typeagent/storage/memory/provider.py +++ b/src/typeagent/storage/memory/provider.py @@ -6,6 +6,8 @@ from datetime import datetime +from typeagent.storage.utils import STATUS_INGESTED + from ...knowpro.convsettings import MessageTextIndexSettings, RelatedTermIndexSettings from ...knowpro.interfaces import ( ConversationMetadata, @@ -160,10 +162,12 @@ def get_source_status(self, source_id: str) -> str | None: The ingestion status if the source has been ingested, None otherwise. """ if source_id in self._ingested_sources: - return "ingested" + return STATUS_INGESTED return None - def mark_source_ingested(self, source_id: str, status: str = "ingested") -> None: + def mark_source_ingested( + self, source_id: str, status: str = STATUS_INGESTED + ) -> None: """Mark a source as ingested. Args: diff --git a/src/typeagent/storage/sqlite/provider.py b/src/typeagent/storage/sqlite/provider.py index 8ec0dd8..51e9e5e 100644 --- a/src/typeagent/storage/sqlite/provider.py +++ b/src/typeagent/storage/sqlite/provider.py @@ -6,6 +6,8 @@ from datetime import datetime, timezone import sqlite3 +from typeagent.storage.utils import STATUS_INGESTED + from ...aitools.embeddings import AsyncEmbeddingModel from ...aitools.vectorbase import TextEmbeddingIndexSettings from ...knowpro import interfaces @@ -629,7 +631,7 @@ def is_source_ingested(self, source_id: str) -> bool: "SELECT status FROM IngestedSources WHERE source_id = ?", (source_id,) ) row = cursor.fetchone() - return row is not None and row[0] == "ingested" + return row is not None and row[0] == STATUS_INGESTED def get_source_status(self, source_id: str) -> str | None: """Get the ingestion status of a source. @@ -647,7 +649,9 @@ def get_source_status(self, source_id: str) -> str | None: row = cursor.fetchone() return row[0] if row else None - def mark_source_ingested(self, source_id: str, status: str = "ingested") -> None: + def mark_source_ingested( + self, source_id: str, status: str = STATUS_INGESTED + ) -> None: """Mark a source as ingested. This performs an INSERT but does NOT commit. It should be called within diff --git a/src/typeagent/storage/sqlite/schema.py b/src/typeagent/storage/sqlite/schema.py index f928333..9703adb 100644 --- a/src/typeagent/storage/sqlite/schema.py +++ b/src/typeagent/storage/sqlite/schema.py @@ -9,6 +9,8 @@ import numpy as np +from typeagent.storage.utils import STATUS_INGESTED + from ...aitools.embeddings import NormalizedEmbedding from ...knowpro.interfaces import ConversationMetadata @@ -144,7 +146,7 @@ INGESTED_SOURCES_SCHEMA = """ CREATE TABLE IF NOT EXISTS IngestedSources ( source_id TEXT PRIMARY KEY, -- External source identifier (email ID, file path, etc.) - status TEXT NOT NULL DEFAULT 'ingested' -- Status of the source (e.g., 'ingested', 'failed') + status TEXT NOT NULL DEFAULT ? -- Status of the source (e.g., 'ingested') ); """ @@ -270,7 +272,7 @@ def init_db_schema(db: sqlite3.Connection) -> None: cursor.execute(RELATED_TERMS_ALIASES_SCHEMA) cursor.execute(RELATED_TERMS_FUZZY_SCHEMA) cursor.execute(TIMESTAMP_INDEX_SCHEMA) - cursor.execute(INGESTED_SOURCES_SCHEMA) + cursor.execute(INGESTED_SOURCES_SCHEMA, (STATUS_INGESTED,)) # Create additional indexes cursor.execute(SEMANTIC_REF_INDEX_TERM_INDEX) diff --git a/src/typeagent/storage/utils.py b/src/typeagent/storage/utils.py index c993462..5049300 100644 --- a/src/typeagent/storage/utils.py +++ b/src/typeagent/storage/utils.py @@ -10,6 +10,8 @@ from ..knowpro.convsettings import MessageTextIndexSettings, RelatedTermIndexSettings from ..knowpro.interfaces import ConversationMetadata, IMessage, IStorageProvider +STATUS_INGESTED = "ingested" + async def create_storage_provider[TMessage: IMessage]( message_text_settings: MessageTextIndexSettings, From 770d1342d41f2758e7a4faabc318dabc7cd7961e Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 15 Jan 2026 21:15:15 -0800 Subject: [PATCH 18/18] Move STATUS_INGESTED definition to interfaces_storage.py --- src/typeagent/knowpro/interfaces_storage.py | 12 ++---------- src/typeagent/storage/memory/provider.py | 3 +-- src/typeagent/storage/sqlite/provider.py | 4 +--- src/typeagent/storage/sqlite/schema.py | 10 ++++------ src/typeagent/storage/utils.py | 2 -- 5 files changed, 8 insertions(+), 23 deletions(-) diff --git a/src/typeagent/knowpro/interfaces_storage.py b/src/typeagent/knowpro/interfaces_storage.py index 3c2faf4..a19834e 100644 --- a/src/typeagent/knowpro/interfaces_storage.py +++ b/src/typeagent/knowpro/interfaces_storage.py @@ -10,8 +10,6 @@ from pydantic.dataclasses import dataclass -from typeagent.storage.utils import STATUS_INGESTED - from .interfaces_core import ( IMessage, ITermToSemanticRefIndex, @@ -28,14 +26,7 @@ ITimestampToTextRangeIndex, ) -__all__ = [ - "ConversationMetadata", - "IReadonlyCollection", - "ICollection", - "IMessageCollection", - "ISemanticRefCollection", - "IStorageProvider", -] +STATUS_INGESTED = "ingested" @dataclass @@ -199,4 +190,5 @@ class IConversation[ "IReadonlyCollection", "ISemanticRefCollection", "IStorageProvider", + "STATUS_INGESTED", ] diff --git a/src/typeagent/storage/memory/provider.py b/src/typeagent/storage/memory/provider.py index 4daf02e..83ef6ab 100644 --- a/src/typeagent/storage/memory/provider.py +++ b/src/typeagent/storage/memory/provider.py @@ -6,8 +6,6 @@ from datetime import datetime -from typeagent.storage.utils import STATUS_INGESTED - from ...knowpro.convsettings import MessageTextIndexSettings, RelatedTermIndexSettings from ...knowpro.interfaces import ( ConversationMetadata, @@ -19,6 +17,7 @@ ITermToRelatedTermsIndex, ITermToSemanticRefIndex, ITimestampToTextRangeIndex, + STATUS_INGESTED, ) from .collections import MemoryMessageCollection, MemorySemanticRefCollection from .convthreads import ConversationThreads diff --git a/src/typeagent/storage/sqlite/provider.py b/src/typeagent/storage/sqlite/provider.py index 51e9e5e..b3a63a9 100644 --- a/src/typeagent/storage/sqlite/provider.py +++ b/src/typeagent/storage/sqlite/provider.py @@ -6,13 +6,11 @@ from datetime import datetime, timezone import sqlite3 -from typeagent.storage.utils import STATUS_INGESTED - from ...aitools.embeddings import AsyncEmbeddingModel from ...aitools.vectorbase import TextEmbeddingIndexSettings from ...knowpro import interfaces from ...knowpro.convsettings import MessageTextIndexSettings, RelatedTermIndexSettings -from ...knowpro.interfaces import ConversationMetadata +from ...knowpro.interfaces import ConversationMetadata, STATUS_INGESTED from .collections import SqliteMessageCollection, SqliteSemanticRefCollection from .messageindex import SqliteMessageTextIndex from .propindex import SqlitePropertyIndex diff --git a/src/typeagent/storage/sqlite/schema.py b/src/typeagent/storage/sqlite/schema.py index 9703adb..db6933d 100644 --- a/src/typeagent/storage/sqlite/schema.py +++ b/src/typeagent/storage/sqlite/schema.py @@ -9,10 +9,8 @@ import numpy as np -from typeagent.storage.utils import STATUS_INGESTED - from ...aitools.embeddings import NormalizedEmbedding -from ...knowpro.interfaces import ConversationMetadata +from ...knowpro.interfaces import ConversationMetadata, STATUS_INGESTED # Constants CONVERSATION_SCHEMA_VERSION = 1 @@ -143,10 +141,10 @@ # Table for tracking ingested source IDs (e.g., email IDs, file paths) # This prevents re-ingesting the same content on subsequent runs -INGESTED_SOURCES_SCHEMA = """ +INGESTED_SOURCES_SCHEMA = f""" CREATE TABLE IF NOT EXISTS IngestedSources ( source_id TEXT PRIMARY KEY, -- External source identifier (email ID, file path, etc.) - status TEXT NOT NULL DEFAULT ? -- Status of the source (e.g., 'ingested') + status TEXT NOT NULL DEFAULT {STATUS_INGESTED} -- Status of the source (e.g., 'ingested') ); """ @@ -272,7 +270,7 @@ def init_db_schema(db: sqlite3.Connection) -> None: cursor.execute(RELATED_TERMS_ALIASES_SCHEMA) cursor.execute(RELATED_TERMS_FUZZY_SCHEMA) cursor.execute(TIMESTAMP_INDEX_SCHEMA) - cursor.execute(INGESTED_SOURCES_SCHEMA, (STATUS_INGESTED,)) + cursor.execute(INGESTED_SOURCES_SCHEMA) # Create additional indexes cursor.execute(SEMANTIC_REF_INDEX_TERM_INDEX) diff --git a/src/typeagent/storage/utils.py b/src/typeagent/storage/utils.py index 5049300..c993462 100644 --- a/src/typeagent/storage/utils.py +++ b/src/typeagent/storage/utils.py @@ -10,8 +10,6 @@ from ..knowpro.convsettings import MessageTextIndexSettings, RelatedTermIndexSettings from ..knowpro.interfaces import ConversationMetadata, IMessage, IStorageProvider -STATUS_INGESTED = "ingested" - async def create_storage_provider[TMessage: IMessage]( message_text_settings: MessageTextIndexSettings,