Skip to content

Commit

Permalink
optimization: 支持日志状态表的删除 (closed TencentBlueKing#1348)
Browse files Browse the repository at this point in the history
  • Loading branch information
CohleRustW committed May 10, 2023
1 parent 0bed2d8 commit 1f5dc39
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 7 deletions.
42 changes: 35 additions & 7 deletions apps/backend/subscription/task_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
import logging
import traceback
import typing
from collections import defaultdict
from typing import Any, Dict, List

Expand Down Expand Up @@ -58,7 +59,9 @@ def list_pipeline_processes(pipeline_id: str) -> Dict[str, List[Dict]]:
return pipeline_processes

@staticmethod
def collect_status(steps: List[Dict[str, Any]]) -> str:
def collect_status(
steps: List[Dict[str, Any]], backtrace_failed_steps=False
) -> typing.Union[str, typing.Tuple[str, List[Dict[str, Any]]]]:
status_set = set([sub_step["status"] for sub_step in steps])
status = constants.JobStatusType.PENDING
if len(status_set) == 1 and list(status_set)[0] == constants.JobStatusType.SUCCESS:
Expand All @@ -67,6 +70,20 @@ def collect_status(steps: List[Dict[str, Any]]) -> str:
status = constants.JobStatusType.FAILED
elif constants.JobStatusType.RUNNING in status_set:
status = constants.JobStatusType.RUNNING

if backtrace_failed_steps:
# 如果 steps 中都是 Pending 那返回的 status 也是 Pending
if status_set == {constants.JobStatusType.PENDING}:
return status, steps
else:
# 填充 steps 中的状态
for step in steps:
if step["status"] in [constants.JobStatusType.FAILED, constants.JobStatusType.RUNNING]:
break
else:
step["status"] = constants.JobStatusType.SUCCESS
return status, steps

return status

@classmethod
Expand Down Expand Up @@ -199,7 +216,8 @@ def get_subscription_task_instance_status(
if not sub_steps:
continue

status = cls.collect_status(sub_steps)
# 子步骤状态聚合 并且填充失败前的子步骤节点状态
status, sub_steps = cls.collect_status(steps=sub_steps, backtrace_failed_steps=True)
last_finish_sub_step = last_finish_sub_step or sub_steps[-1]

finish_time = None
Expand Down Expand Up @@ -229,11 +247,21 @@ def get_subscription_task_instance_status(
start_node_index = next_node_index

if instance_status["steps"]:
instance_status.update(
status=cls.collect_status(instance_status["steps"]),
start_time=instance_status["steps"][0]["start_time"],
finish_time=instance_status["steps"][-1]["finish_time"],
)
# 如果 instance_record_obj 的状态不是 PENDING,且聚合的状态是 PENDING 则从 instance_record 获取状态
instance_record_status = instance_record_obj.status
collect_status = cls.collect_status(instance_status["steps"])
if (
instance_record_status != constants.JobStatusType.PENDING
and collect_status == constants.JobStatusType.PENDING
):
status = instance_record_status
start_time = strftime_local(instance_record_obj.create_time)
finish_time = strftime_local(instance_record_obj.update_time)
else:
status = cls.collect_status(instance_status["steps"])
start_time = instance_status["steps"][0]["start_time"]
finish_time = instance_status["steps"][-1]["finish_time"]
instance_status.update(status=status, start_time=start_time, finish_time=finish_time)
else:
# 没有执行步骤,从instance_record获取状态
instance_status["status"] = instance_record_obj.status
Expand Down
14 changes: 14 additions & 0 deletions apps/node_man/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,20 @@ def get_choices(cls):
(cls.IGNORED, _("已忽略")),
)

@classmethod
def get_member_value__alias_map(cls) -> Dict[str, str]:
return {
cls.PENDING: _("等待执行"),
cls.RUNNING: _("正在执行"),
cls.SUCCESS: _("执行成功"),
cls.FAILED: _("执行失败"),
cls.PART_FAILED: _("部分失败"),
cls.TERMINATED: _("已终止"),
cls.REMOVED: _("已移除"),
cls.FILTERED: _("被过滤的"),
cls.IGNORED: _("已忽略"),
}


NODE_MAN_LOG_LEVEL = ("INFO", "DEBUG", "PRIMARY", "WARNING", "ERROR")

Expand Down
11 changes: 11 additions & 0 deletions apps/node_man/tools/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ def get_current_step_display(cls, instance_status: Dict[str, Any]) -> Dict[str,
if last_step["status"] == constants.JobStatusType.SUCCESS:
return {"node_id": last_step.get("index"), "status_display": _("执行成功"), "step": last_step["node_name"]}
else:
status: str = instance_status["status"]
if (
set([step["status"] for step in sub_steps]) == {constants.JobStatusType.PENDING}
and status != constants.JobStatusType.PENDING
):
job_status_map: Dict[str, str] = constants.JobStatusType.get_member_value__alias_map()
return {
"node_id": -1,
"status_display": _("{}").format(job_status_map[status]),
"step": last_step["node_name"],
}
return {"node_id": -1, "status_display": _("等待执行"), "step": last_step["node_name"]}

@classmethod
Expand Down

0 comments on commit 1f5dc39

Please sign in to comment.