Skip to content

Commit

Permalink
add taskexecutor status check (infiniflow#2038)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?


### Type of change

- [x] New Feature (non-breaking change which adds functionality)
  • Loading branch information
KevinHuSh authored Aug 21, 2024
1 parent a7a1362 commit 6ac88ee
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
2 changes: 1 addition & 1 deletion api/apps/canvas_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,4 @@ def test_db_connect():
db.close()
return get_json_result(retmsg="Database Connection Successful!")
except Exception as e:
return server_error_response(str(e))
return server_error_response(e)
18 changes: 18 additions & 0 deletions api/apps/system_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License
#
import json

from flask_login import login_required

from api.db.services.knowledgebase_service import KnowledgebaseService
Expand Down Expand Up @@ -65,4 +67,20 @@ def status():
except Exception as e:
res["redis"] = {"status": "red", "elapsed": "{:.1f}".format((timer() - st)*1000.), "error": str(e)}

try:
obj = json.loads(REDIS_CONN.get("TASKEXE"))
color = "green"
for id in obj.keys():
arr = obj[id]
if len(arr) == 1:
obj[id] = [0]
else:
obj[id] = [arr[i+1]-arr[i] for i in range(len(arr)-1)]
elapsed = max(obj[id])
if elapsed > 50: color = "yellow"
if elapsed > 120: color = "red"
res["task_executor"] = {"status": color, "elapsed": obj}
except Exception as e:
res["task_executor"] = {"status": "red", "error": str(e)}

return get_json_result(data=res)
18 changes: 18 additions & 0 deletions rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import sys
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from functools import partial

from api.db.services.file2document_service import File2DocumentService
Expand Down Expand Up @@ -373,11 +374,28 @@ def main():
r["id"], tk_count, len(cks), timer() - st))


def report_status():
id = "0" if len(sys.argv) < 2 else sys.argv[1]
while True:
try:
obj = REDIS_CONN.get("TASKEXE")
obj = json.load(obj)
if id not in obj: obj[id] = []
obj[id].append(timer()*1000)
obj[id] = obj[id][:-60]
REDIS_CONN.set_obj("TASKEXE", obj)
except Exception as e:
print("[Exception]:", str(e))
time.sleep(60)

if __name__ == "__main__":
peewee_logger = logging.getLogger('peewee')
peewee_logger.propagate = False
peewee_logger.addHandler(database_logger.handlers[0])
peewee_logger.setLevel(database_logger.level)

exe = ThreadPoolExecutor(max_workers=1)
exe.submit(report_status)

while True:
main()

0 comments on commit 6ac88ee

Please sign in to comment.