From 2efb4a39e0d77003070c8f489f0223ebacd0baa1 Mon Sep 17 00:00:00 2001 From: jimmoffet Date: Wed, 6 Nov 2024 21:41:21 -0800 Subject: [PATCH] stringify table names and sleep on scans --- backend/apps/rag/clients/vector_client.py | 22 ++-- backend/apps/rag/main.py | 112 +++++++++--------- backend/main.py | 4 +- src/lib/components/workspace/Documents.svelte | 40 +++---- 4 files changed, 92 insertions(+), 86 deletions(-) diff --git a/backend/apps/rag/clients/vector_client.py b/backend/apps/rag/clients/vector_client.py index 6651bf261..d1fcbffaf 100644 --- a/backend/apps/rag/clients/vector_client.py +++ b/backend/apps/rag/clients/vector_client.py @@ -49,7 +49,7 @@ def has_collection(self, collection_name: str) -> bool: return result and result[0] def delete_collection(self, collection_name: str): - self.conn.execute(f"DROP TABLE IF EXISTS {collection_name}") + self.conn.execute(f'DROP TABLE IF EXISTS "{collection_name}"') def search( self, collection_name: str, vectors: list[list[float]], limit: int @@ -59,7 +59,7 @@ def search( # self.conn.execute("SET LOCAL hnsw.ef_search = 64") query = f""" SELECT id, content, (embedding <=> %s::vector) AS distance, metadata - FROM {collection_name} + FROM "{collection_name}" ORDER BY distance LIMIT %s; """ @@ -84,7 +84,7 @@ def query( try: where_clause = " AND ".join([f"{key} = %({key})s" for key in filter.keys()]) result = self.conn.execute( - f"SELECT id, content, metadata FROM {collection_name} WHERE {where_clause} LIMIT %(limit)s;", + f'SELECT id, content, metadata FROM "{collection_name}" WHERE {where_clause} LIMIT %(limit)s;', {**filter, "limit": limit}, ).fetchall() @@ -102,7 +102,7 @@ def query( def get(self, collection_name: str) -> Optional[GetResult]: try: result = self.conn.execute( - f"SELECT id, content, metadata FROM {collection_name};" + f'SELECT id, content, metadata FROM "{collection_name}";' ).fetchall() if result: @@ -120,13 +120,13 @@ def insert(self, collection_name: str, items: list[VectorItem]): try: if not self.has_collection(collection_name): self.conn.execute( - f"CREATE TABLE {collection_name} " + f'CREATE TABLE "{collection_name}" ' f"(id bigserial PRIMARY KEY, content text, embedding vector(1536), metadata jsonb)" ) for item in items: self.conn.execute( - f"INSERT INTO {collection_name} (content, embedding, metadata) VALUES (%s, %s, %s)", + f'INSERT INTO "{collection_name}" (content, embedding, metadata) VALUES (%s, %s, %s)', (item["text"], item["vector"], json.dumps(item["metadata"])), ) except Exception as e: @@ -136,14 +136,14 @@ def upsert(self, collection_name: str, items: list[VectorItem]): try: if not self.has_collection(collection_name): self.conn.execute( - f"CREATE TABLE {collection_name} " + f'CREATE TABLE "{collection_name}" ' f"(id bigserial PRIMARY KEY, content text, embedding vector(1536), metadata jsonb)" ) for item in items: self.conn.execute( f""" - INSERT INTO {collection_name} (content, embedding, metadata) + INSERT INTO "{collection_name}" (content, embedding, metadata) VALUES (%s, %s, %s) ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content, embedding = EXCLUDED.embedding, metadata = EXCLUDED.metadata @@ -164,7 +164,7 @@ def delete( try: if ids: self.conn.execute( - f"DELETE FROM {collection_name} WHERE id IN %(ids)s;", + f'DELETE FROM "{collection_name}" WHERE id IN %(ids)s;', {"ids": tuple(ids)}, ) elif filter: @@ -172,10 +172,10 @@ def delete( [f"{key} = %({key})s" for key in filter.keys()] ) self.conn.execute( - f"DELETE FROM {collection_name} WHERE {where_clause};", filter + f'DELETE FROM "{collection_name}" WHERE {where_clause};', filter ) elif not ids and not filter: # Check if both are empty - self.conn.execute(f"DROP TABLE IF EXISTS {collection_name};") + self.conn.execute(f'DROP TABLE IF EXISTS "{collection_name}";') return True except Exception as e: diff --git a/backend/apps/rag/main.py b/backend/apps/rag/main.py index fc0ff6f55..fe122e637 100644 --- a/backend/apps/rag/main.py +++ b/backend/apps/rag/main.py @@ -1,3 +1,4 @@ +from asyncio import sleep from fastapi import ( FastAPI, Depends, @@ -1211,61 +1212,66 @@ def store_text( @app.get("/scan") async def scan_docs_dir(user=Depends(get_admin_user)): - for path in Path(DOCS_DIR).rglob("./**/*"): - try: - if path.is_file() and not path.name.startswith("."): - tags = extract_folders_after_data_docs(path) - filename = path.name - file_content_type = mimetypes.guess_type(path) - - f = open(path, "rb") - collection_name = calculate_sha256(f)[:63] - f.close() - - loader, known_type = get_loader( - filename, file_content_type[0], str(path) - ) - data = loader.load() - - try: - result = await store_data_in_vector_db(data, collection_name) - - if result: - sanitized_filename = sanitize_filename(filename) - doc = Documents.get_doc_by_name(sanitized_filename) - - if doc is None: - doc = Documents.insert_new_doc( - user.id, - DocumentForm( - **{ - "name": sanitized_filename, - "title": filename, - "collection_name": collection_name, - "filename": filename, - "content": ( - json.dumps( - { - "tags": list( - map( - lambda name: {"name": name}, - tags, + for dir in [DOCS_DIR, UPLOAD_DIR]: + for path in Path(dir).rglob("./**/*"): + try: + if path.is_file() and not path.name.startswith("."): + tags = extract_folders_after_data_docs(path) + filename = path.name + file_content_type = mimetypes.guess_type(path) + + f = open(path, "rb") + log.info(f"Scanning {path}") + collection_name = calculate_sha256(f)[:63] + f.close() + + loader, known_type = get_loader( + filename, file_content_type[0], str(path) + ) + data = loader.load() + + try: + await sleep(60) + result = await store_data_in_vector_db(data, collection_name) + + if result: + sanitized_filename = sanitize_filename(filename) + doc = Documents.get_doc_by_name(sanitized_filename) + + if doc is None: + doc = Documents.insert_new_doc( + user.id, + DocumentForm( + **{ + "name": sanitized_filename, + "title": filename, + "collection_name": collection_name, + "filename": filename, + "content": ( + json.dumps( + { + "tags": list( + map( + lambda name: { + "name": name + }, + tags, + ) ) - ) - } - ) - if len(tags) - else "{}" - ), - } - ), - ) - except Exception as e: - log.exception(e) - pass + } + ) + if len(tags) + else "{}" + ), + } + ), + ) + except Exception as e: + log.exception(e) + pass - except Exception as e: - log.exception(e) + except Exception as e: + log.exception(e) return True diff --git a/backend/main.py b/backend/main.py index abcd60203..7451aab1a 100644 --- a/backend/main.py +++ b/backend/main.py @@ -399,14 +399,14 @@ async def dispatch(self, request: Request, call_next): del data["docs"] - log.info(f"rag_context: {rag_context}, citations: {citations}") + log.debug(f"rag_context: {rag_context}, citations: {citations}") if context != "": system_prompt = rag_template( rag_app.state.config.RAG_TEMPLATE, context, prompt ) - print(system_prompt) + log.info(system_prompt) data["messages"] = add_or_update_system_message( f"\n{system_prompt}", data["messages"] diff --git a/src/lib/components/workspace/Documents.svelte b/src/lib/components/workspace/Documents.svelte index 921a79b47..8302e0875 100644 --- a/src/lib/components/workspace/Documents.svelte +++ b/src/lib/components/workspace/Documents.svelte @@ -463,26 +463,26 @@ + class="self-center w-fit text-sm px-2 py-2 border dark:border-gray-600 rounded-xl" + type="button" + on:click={() => { + console.log('download file'); + }} + > + + + + + -->