Skip to content

Commit

Permalink
Refactor parse progress (infiniflow#3781)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Refactor parse file progress

### Type of change

- [x] Refactoring

Signed-off-by: jinhai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN authored Dec 1, 2024
1 parent ea84cc2 commit 08c1a5e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 74 deletions.
22 changes: 10 additions & 12 deletions rag/app/naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
Next, these successive pieces are merge into chunks whose token number is no more than 'Max token number'.
"""

eng = lang.lower() == "english" # is_english(cks)
is_english = lang.lower() == "english" # is_english(cks)
parser_config = kwargs.get(
"parser_config", {
"chunk_token_num": 128, "delimiter": "\n!?。;!?", "layout_recognize": True})
Expand All @@ -206,8 +206,8 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
pdf_parser = None
if re.search(r"\.docx$", filename, re.IGNORECASE):
callback(0.1, "Start to parse.")
sections, tbls = Docx()(filename, binary)
res = tokenize_table(tbls, doc, eng) # just for table
sections, tables = Docx()(filename, binary)
res = tokenize_table(tables, doc, is_english) # just for table

callback(0.8, "Finish parsing.")
st = timer()
Expand All @@ -220,16 +220,14 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
if kwargs.get("section_only", False):
return chunks

res.extend(tokenize_chunks_docx(chunks, doc, eng, images))
res.extend(tokenize_chunks_docx(chunks, doc, is_english, images))
logging.info("naive_merge({}): {}".format(filename, timer() - st))
return res

elif re.search(r"\.pdf$", filename, re.IGNORECASE):
pdf_parser = Pdf(
) if parser_config.get("layout_recognize", True) else PlainParser()
sections, tbls = pdf_parser(filename if not binary else binary,
from_page=from_page, to_page=to_page, callback=callback)
res = tokenize_table(tbls, doc, eng)
pdf_parser = Pdf() if parser_config.get("layout_recognize", True) else PlainParser()
sections, tables = pdf_parser(filename if not binary else binary, from_page=from_page, to_page=to_page, callback=callback)
res = tokenize_table(tables, doc, is_english)

elif re.search(r"\.xlsx?$", filename, re.IGNORECASE):
callback(0.1, "Start to parse.")
Expand All @@ -248,8 +246,8 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,

elif re.search(r"\.(md|markdown)$", filename, re.IGNORECASE):
callback(0.1, "Start to parse.")
sections, tbls = Markdown(int(parser_config.get("chunk_token_num", 128)))(filename, binary)
res = tokenize_table(tbls, doc, eng)
sections, tables = Markdown(int(parser_config.get("chunk_token_num", 128)))(filename, binary)
res = tokenize_table(tables, doc, is_english)
callback(0.8, "Finish parsing.")

elif re.search(r"\.(htm|html)$", filename, re.IGNORECASE):
Expand Down Expand Up @@ -289,7 +287,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
if kwargs.get("section_only", False):
return chunks

res.extend(tokenize_chunks(chunks, doc, eng, pdf_parser))
res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser))
logging.info("naive_merge({}): {}".format(filename, timer() - st))
return res

Expand Down
125 changes: 63 additions & 62 deletions rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import logging
import sys

from api.utils.log_utils import initRootLogger

CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1]
Expand Down Expand Up @@ -166,52 +167,44 @@ def get_storage_binary(bucket, name):
return STORAGE_IMPL.get(bucket, name)


def build(row):
if row["size"] > DOC_MAXIMUM_SIZE:
set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
(int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
def build_chunks(task, progress_callback):
if task["size"] > DOC_MAXIMUM_SIZE:
set_progress(task["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
(int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
return []

callback = partial(
set_progress,
row["id"],
row["from_page"],
row["to_page"])
chunker = FACTORY[row["parser_id"].lower()]
chunker = FACTORY[task["parser_id"].lower()]
try:
st = timer()
bucket, name = File2DocumentService.get_storage_address(doc_id=row["doc_id"])
bucket, name = File2DocumentService.get_storage_address(doc_id=task["doc_id"])
binary = get_storage_binary(bucket, name)
logging.info(
"From minio({}) {}/{}".format(timer() - st, row["location"], row["name"]))
logging.info("From minio({}) {}/{}".format(timer() - st, task["location"], task["name"]))
except TimeoutError:
callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.")
logging.exception(
"Minio {}/{} got timeout: Fetch file from minio timeout.".format(row["location"], row["name"]))
progress_callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.")
logging.exception("Minio {}/{} got timeout: Fetch file from minio timeout.".format(task["location"], task["name"]))
raise
except Exception as e:
if re.search("(No such file|not found)", str(e)):
callback(-1, "Can not find file <%s> from minio. Could you try it again?" % row["name"])
progress_callback(-1, "Can not find file <%s> from minio. Could you try it again?" % task["name"])
else:
callback(-1, "Get file from minio: %s" % str(e).replace("'", ""))
logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"]))
progress_callback(-1, "Get file from minio: %s" % str(e).replace("'", ""))
logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))
raise

try:
cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
to_page=row["to_page"], lang=row["language"], callback=callback,
kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
logging.info("Chunking({}) {}/{} done".format(timer() - st, row["location"], row["name"]))
cks = chunker.chunk(task["name"], binary=binary, from_page=task["from_page"],
to_page=task["to_page"], lang=task["language"], callback=progress_callback,
kb_id=task["kb_id"], parser_config=task["parser_config"], tenant_id=task["tenant_id"])
logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"]))
except Exception as e:
callback(-1, "Internal server error while chunking: %s" %
str(e).replace("'", ""))
logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"]))
progress_callback(-1, "Internal server error while chunking: %s" % str(e).replace("'", ""))
logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))
raise

docs = []
doc = {
"doc_id": row["doc_id"],
"kb_id": str(row["kb_id"])
"doc_id": task["doc_id"],
"kb_id": str(task["kb_id"])
}
el = 0
for ck in cks:
Expand Down Expand Up @@ -240,41 +233,40 @@ def build(row):
d["image"].save(output_buffer, format='JPEG')

st = timer()
STORAGE_IMPL.put(row["kb_id"], d["id"], output_buffer.getvalue())
STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue())
el += timer() - st
except Exception:
logging.exception(
"Saving image of chunk {}/{}/{} got exception".format(row["location"], row["name"], d["_id"]))
logging.exception("Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["_id"]))
raise

d["img_id"] = "{}-{}".format(row["kb_id"], d["id"])
d["img_id"] = "{}-{}".format(task["kb_id"], d["id"])
del d["image"]
docs.append(d)
logging.info("MINIO PUT({}):{}".format(row["name"], el))
logging.info("MINIO PUT({}):{}".format(task["name"], el))

if row["parser_config"].get("auto_keywords", 0):
if task["parser_config"].get("auto_keywords", 0):
st = timer()
callback(msg="Start to generate keywords for every chunk ...")
chat_mdl = LLMBundle(row["tenant_id"], LLMType.CHAT, llm_name=row["llm_id"], lang=row["language"])
progress_callback(msg="Start to generate keywords for every chunk ...")
chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])
for d in docs:
d["important_kwd"] = keyword_extraction(chat_mdl, d["content_with_weight"],
row["parser_config"]["auto_keywords"]).split(",")
task["parser_config"]["auto_keywords"]).split(",")
d["important_tks"] = rag_tokenizer.tokenize(" ".join(d["important_kwd"]))
callback(msg="Keywords generation completed in {:.2f}s".format(timer() - st))
progress_callback(msg="Keywords generation completed in {:.2f}s".format(timer() - st))

if row["parser_config"].get("auto_questions", 0):
if task["parser_config"].get("auto_questions", 0):
st = timer()
callback(msg="Start to generate questions for every chunk ...")
chat_mdl = LLMBundle(row["tenant_id"], LLMType.CHAT, llm_name=row["llm_id"], lang=row["language"])
progress_callback(msg="Start to generate questions for every chunk ...")
chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])
for d in docs:
qst = question_proposal(chat_mdl, d["content_with_weight"], row["parser_config"]["auto_questions"])
qst = question_proposal(chat_mdl, d["content_with_weight"], task["parser_config"]["auto_questions"])
d["content_with_weight"] = f"Question: \n{qst}\n\nAnswer:\n" + d["content_with_weight"]
qst = rag_tokenizer.tokenize(qst)
if "content_ltks" in d:
d["content_ltks"] += " " + qst
if "content_sm_ltks" in d:
d["content_sm_ltks"] += " " + rag_tokenizer.fine_grained_tokenize(qst)
callback(msg="Question generation completed in {:.2f}s".format(timer() - st))
progress_callback(msg="Question generation completed in {:.2f}s".format(timer() - st))

return docs

Expand Down Expand Up @@ -389,7 +381,9 @@ def do_handle_task(task):
# bind embedding model
embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
except Exception as e:
progress_callback(-1, msg=f'Fail to bind embedding model: {str(e)}')
error_message = f'Fail to bind embedding model: {str(e)}'
progress_callback(-1, msg=error_message)
logging.exception(error_message)
raise

# Either using RAPTOR or Standard chunking methods
Expand All @@ -399,14 +393,16 @@ def do_handle_task(task):
chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)

# run RAPTOR
chunks, tk_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback)
chunks, token_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback)
except Exception as e:
progress_callback(-1, msg=f'Fail to bind LLM used by RAPTOR: {str(e)}')
error_message = f'Fail to bind LLM used by RAPTOR: {str(e)}'
progress_callback(-1, msg=error_message)
logging.exception(error_message)
raise
else:
# Standard chunking methods
start_ts = timer()
chunks = build(task)
chunks = build_chunks(task, progress_callback)
logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts))
if chunks is None:
return
Expand All @@ -418,38 +414,43 @@ def do_handle_task(task):
progress_callback(msg="Generate {} chunks".format(len(chunks)))
start_ts = timer()
try:
tk_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback)
token_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback)
except Exception as e:
progress_callback(-1, "Generate embedding error:{}".format(str(e)))
logging.exception("run_embedding got exception")
tk_count = 0
error_message = "Generate embedding error:{}".format(str(e))
progress_callback(-1, error_message)
logging.exception(error_message)
token_count = 0
raise
logging.info("Embedding {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
progress_callback(msg="Embedding chunks ({:.2f}s)".format(timer() - start_ts))
progress_message = "Embedding chunks ({:.2f}s)".format(timer() - start_ts)
logging.info(progress_message)
progress_callback(msg=progress_message)
# logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}")
init_kb(task, vector_size)
chunk_count = len(set([chunk["id"] for chunk in chunks]))
start_ts = timer()
es_r = ""
doc_store_result = ""
es_bulk_size = 4
for b in range(0, len(chunks), es_bulk_size):
es_r = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id)
doc_store_result = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id)
if b % 128 == 0:
progress_callback(prog=0.8 + 0.1 * (b + 1) / len(chunks), msg="")
logging.info("Indexing {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
if es_r:
progress_callback(-1, "Insert chunk error, detail info please check log file. Please also check Elasticsearch/Infinity status!")
if doc_store_result:
error_message = "Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!"
progress_callback(-1, msg=error_message)
settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
logging.error('Insert chunk error: ' + str(es_r))
raise Exception('Insert chunk error: ' + str(es_r))
logging.error(error_message)
raise Exception(error_message)

if TaskService.do_cancel(task_id):
settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
return

progress_callback(1., msg="Finish Index ({:.2f}s)".format(timer() - start_ts))
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, tk_count, chunk_count, 0)
logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, tk_count, len(chunks), timer() - start_ts))
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)

time_cost = timer() - start_ts
progress_callback(prog=1.0, msg="Done ({:.2f}s)".format(time_cost))
logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, token_count, len(chunks), time_cost))


def handle_task():
Expand Down

0 comments on commit 08c1a5e

Please sign in to comment.