diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 92534a501d0..201a3f41fab 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -361,8 +361,9 @@ def run(): e, doc = DocumentService.get_by_id(id) if not e: return get_data_error_result(message="Document not found!") - if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id): - settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id) + if req.get("delete", False): + if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id): + settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id) if str(req["run"]) == TaskStatus.RUNNING.value: TaskService.filter_delete([Task.doc_id == id]) diff --git a/conf/infinity_mapping.json b/conf/infinity_mapping.json index 118f205f995..f60a7b4db9a 100644 --- a/conf/infinity_mapping.json +++ b/conf/infinity_mapping.json @@ -25,5 +25,6 @@ "available_int": {"type": "integer", "default": 1}, "knowledge_graph_kwd": {"type": "varchar", "default": ""}, "entities_kwd": {"type": "varchar", "default": ""}, - "pagerank_fea": {"type": "integer", "default": 0} + "pagerank_fea": {"type": "integer", "default": 0}, + "task_kwd": {"type": "varchar", "default": ""} } diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 902c1e31aef..308a000fd35 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -57,6 +57,7 @@ from rag.utils import rmSpace, num_tokens_from_string from rag.utils.redis_conn import REDIS_CONN, Payload from rag.utils.storage_factory import STORAGE_IMPL +from rag.utils.doc_store_conn import OrderByExpr BATCH_SIZE = 64 @@ -358,6 +359,22 @@ def run_raptor(row, chat_mdl, embd_mdl, callback=None): return res, tk_count, vector_size +def task_chunks_exist(task: dict) -> (str, bool): + md5 = hashlib.md5() + for field in ["task_type", "tenant_id", "kb_id", "doc_id", "name", "from_page", "to_page", "parser_config", "embd_id", + "language", "llm_id"]: + md5.update(str(task.get(field, "")).encode("utf-8")) + task_kwd = md5.hexdigest() + + fields = ["id"] + condition = {"task_kwd": task_kwd} + tenant_id = task["tenant_id"] + kb_ids = [task["kb_id"]] + res = settings.docStoreConn.search(fields, [], condition, [], OrderByExpr(), 0, 1, search.index_name(tenant_id), kb_ids) + dict_chunks = settings.docStoreConn.getFields(res, fields) + return task_kwd, len(dict_chunks) > 0 + + def do_handle_task(task): task_id = task["id"] task_from_page = task["from_page"] @@ -373,6 +390,11 @@ def do_handle_task(task): # prepare the progress callback function progress_callback = partial(set_progress, task_id, task_from_page, task_to_page) + task_kwd, found = task_chunks_exist(task) + if found: + progress_callback(1., msg=f"Chunks of task {task_kwd} already exist, skip.") + return + try: # bind embedding model embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language) @@ -420,6 +442,9 @@ def do_handle_task(task): progress_message = "Embedding chunks ({:.2f}s)".format(timer() - start_ts) logging.info(progress_message) progress_callback(msg=progress_message) + + for chunk in chunks: + chunk["task_kwd"] = task_kwd # logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}") init_kb(task, vector_size) chunk_count = len(set([chunk["id"] for chunk in chunks])) @@ -434,8 +459,6 @@ def do_handle_task(task): if doc_store_result: error_message = f"Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!" progress_callback(-1, msg=error_message) - settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id) - logging.error(error_message) raise Exception(error_message) if TaskService.do_cancel(task_id):