diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index ab5aff96..f7f87c2b 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -215,7 +215,27 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: | ".scss" | ".less" ): - content = file.decode("utf-8") + try: + # Try to decode as UTF-8 + content = file.decode("utf-8") + + # Validate content + if not content or len(content.strip()) == 0: + logger.error(f"Empty content in file: {file_path.name}") + return False + + # Check if content looks like binary data string representation + if content.startswith("b'") or content.startswith('b"'): + logger.error( + f"File {file_path.name} appears to contain binary data representation instead of text" + ) + return False + + except UnicodeDecodeError: + logger.error( + f"File {file_path.name} is not valid UTF-8 encoded text. Please convert it to UTF-8 before processing." + ) + return False case ".pdf": if not pm.is_installed("pypdf2"): pm.install("pypdf2") @@ -229,7 +249,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: case ".docx": if not pm.is_installed("docx"): pm.install("docx") - from docx import Document + from docx import Document # type: ignore from io import BytesIO docx_file = BytesIO(file) @@ -238,7 +258,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: case ".pptx": if not pm.is_installed("pptx"): pm.install("pptx") - from pptx import Presentation + from pptx import Presentation # type: ignore from io import BytesIO pptx_file = BytesIO(file) @@ -250,7 +270,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: case ".xlsx": if not pm.is_installed("openpyxl"): pm.install("openpyxl") - from openpyxl import load_workbook + from openpyxl import load_workbook # type: ignore from io import BytesIO xlsx_file = BytesIO(file) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 208bdf3e..7ac5ece9 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -670,8 +670,24 @@ async def apipeline_enqueue_documents( all_new_doc_ids = set(new_docs.keys()) # Exclude IDs of documents that are already in progress unique_new_doc_ids = await self.doc_status.filter_keys(all_new_doc_ids) + + # Log ignored document IDs + ignored_ids = [ + doc_id for doc_id in unique_new_doc_ids if doc_id not in new_docs + ] + if ignored_ids: + logger.warning( + f"Ignoring {len(ignored_ids)} document IDs not found in new_docs" + ) + for doc_id in ignored_ids: + logger.warning(f"Ignored document ID: {doc_id}") + # Filter new_docs to only include documents with unique IDs - new_docs = {doc_id: new_docs[doc_id] for doc_id in unique_new_doc_ids} + new_docs = { + doc_id: new_docs[doc_id] + for doc_id in unique_new_doc_ids + if doc_id in new_docs + } if not new_docs: logger.info("No new unique documents were found.") diff --git a/run_with_gunicorn.py b/run_with_gunicorn.py deleted file mode 100755 index 2e4e3cf7..00000000 --- a/run_with_gunicorn.py +++ /dev/null @@ -1,203 +0,0 @@ -#!/usr/bin/env python -""" -Start LightRAG server with Gunicorn -""" - -import os -import sys -import signal -import pipmaster as pm -from lightrag.api.utils_api import parse_args, display_splash_screen -from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data - - -def check_and_install_dependencies(): - """Check and install required dependencies""" - required_packages = [ - "gunicorn", - "tiktoken", - "psutil", - # Add other required packages here - ] - - for package in required_packages: - if not pm.is_installed(package): - print(f"Installing {package}...") - pm.install(package) - print(f"{package} installed successfully") - - -# Signal handler for graceful shutdown -def signal_handler(sig, frame): - print("\n\n" + "=" * 80) - print("RECEIVED TERMINATION SIGNAL") - print(f"Process ID: {os.getpid()}") - print("=" * 80 + "\n") - - # Release shared resources - finalize_share_data() - - # Exit with success status - sys.exit(0) - - -def main(): - # Check and install dependencies - check_and_install_dependencies() - - # Register signal handlers for graceful shutdown - signal.signal(signal.SIGINT, signal_handler) # Ctrl+C - signal.signal(signal.SIGTERM, signal_handler) # kill command - - # Parse all arguments using parse_args - args = parse_args(is_uvicorn_mode=False) - - # Display startup information - display_splash_screen(args) - - print("🚀 Starting LightRAG with Gunicorn") - print(f"🔄 Worker management: Gunicorn (workers={args.workers})") - print("🔍 Preloading app: Enabled") - print("📝 Note: Using Gunicorn's preload feature for shared data initialization") - print("\n\n" + "=" * 80) - print("MAIN PROCESS INITIALIZATION") - print(f"Process ID: {os.getpid()}") - print(f"Workers setting: {args.workers}") - print("=" * 80 + "\n") - - # Import Gunicorn's StandaloneApplication - from gunicorn.app.base import BaseApplication - - # Define a custom application class that loads our config - class GunicornApp(BaseApplication): - def __init__(self, app, options=None): - self.options = options or {} - self.application = app - super().__init__() - - def load_config(self): - # Define valid Gunicorn configuration options - valid_options = { - "bind", - "workers", - "worker_class", - "timeout", - "keepalive", - "preload_app", - "errorlog", - "accesslog", - "loglevel", - "certfile", - "keyfile", - "limit_request_line", - "limit_request_fields", - "limit_request_field_size", - "graceful_timeout", - "max_requests", - "max_requests_jitter", - } - - # Special hooks that need to be set separately - special_hooks = { - "on_starting", - "on_reload", - "on_exit", - "pre_fork", - "post_fork", - "pre_exec", - "pre_request", - "post_request", - "worker_init", - "worker_exit", - "nworkers_changed", - "child_exit", - } - - # Import and configure the gunicorn_config module - import gunicorn_config - - # Set configuration variables in gunicorn_config, prioritizing command line arguments - gunicorn_config.workers = ( - args.workers if args.workers else int(os.getenv("WORKERS", 1)) - ) - - # Bind configuration prioritizes command line arguments - host = args.host if args.host != "0.0.0.0" else os.getenv("HOST", "0.0.0.0") - port = args.port if args.port != 9621 else int(os.getenv("PORT", 9621)) - gunicorn_config.bind = f"{host}:{port}" - - # Log level configuration prioritizes command line arguments - gunicorn_config.loglevel = ( - args.log_level.lower() - if args.log_level - else os.getenv("LOG_LEVEL", "info") - ) - - # Timeout configuration prioritizes command line arguments - gunicorn_config.timeout = ( - args.timeout if args.timeout else int(os.getenv("TIMEOUT", 150)) - ) - - # Keepalive configuration - gunicorn_config.keepalive = int(os.getenv("KEEPALIVE", 5)) - - # SSL configuration prioritizes command line arguments - if args.ssl or os.getenv("SSL", "").lower() in ( - "true", - "1", - "yes", - "t", - "on", - ): - gunicorn_config.certfile = ( - args.ssl_certfile - if args.ssl_certfile - else os.getenv("SSL_CERTFILE") - ) - gunicorn_config.keyfile = ( - args.ssl_keyfile if args.ssl_keyfile else os.getenv("SSL_KEYFILE") - ) - - # Set configuration options from the module - for key in dir(gunicorn_config): - if key in valid_options: - value = getattr(gunicorn_config, key) - # Skip functions like on_starting and None values - if not callable(value) and value is not None: - self.cfg.set(key, value) - # Set special hooks - elif key in special_hooks: - value = getattr(gunicorn_config, key) - if callable(value): - self.cfg.set(key, value) - - if hasattr(gunicorn_config, "logconfig_dict"): - self.cfg.set( - "logconfig_dict", getattr(gunicorn_config, "logconfig_dict") - ) - - def load(self): - # Import the application - from lightrag.api.lightrag_server import get_application - - return get_application(args) - - # Create the application - app = GunicornApp("") - - # Force workers to be an integer and greater than 1 for multi-process mode - workers_count = int(args.workers) - if workers_count > 1: - # Set a flag to indicate we're in the main process - os.environ["LIGHTRAG_MAIN_PROCESS"] = "1" - initialize_share_data(workers_count) - else: - initialize_share_data(1) - - # Run the application - print("\nStarting Gunicorn with direct Python API...") - app.run() - - -if __name__ == "__main__": - main()