Skip to content

Commit

Permalink
feat: 流控
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Jul 13, 2024
1 parent b00d9dd commit 364ab82
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 438 deletions.
154 changes: 0 additions & 154 deletions apps/backend/subscription/agent_tasks.py

This file was deleted.

6 changes: 6 additions & 0 deletions apps/backend/subscription/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,9 @@ class SubscriptionIncludeGrayBizError(AppBaseException):
ERROR_CODE = 19
MESSAGE = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")
MESSAGE_TPL = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")


class SubscriptionInstanceCountMismatchError(AppBaseException):
ERROR_CODE = 20
MESSAGE = _("订阅实例数量不匹配")
MESSAGE_TPL = _("订阅实例数量不匹配")
10 changes: 0 additions & 10 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

from apps.backend.subscription import errors, task_tools, tasks, tools
from apps.backend.subscription.errors import InstanceTaskIsRunning
from apps.backend.utils.pipeline_parser import PipelineParser
from apps.core.concurrent import controller
from apps.node_man import constants, models
from apps.utils import concurrent
Expand Down Expand Up @@ -407,18 +406,9 @@ def task_result_detail(self, instance_id: str, task_id_list: List[int] = None) -

# 选取指定 subscription_id & task_id_list范围下最新的instance_record
instance_record = models.SubscriptionInstanceRecord.objects.filter(**filter_kwargs).order_by("-id").first()

if not instance_record:
raise errors.SubscriptionInstanceRecordNotExist()

# 兼容Agent任务通过改造前逻辑获取任务状态
if not instance_record.subscription_task.pipeline_id:
pipeline_parser = PipelineParser([instance_record.pipeline_id])
instance_status = tools.get_subscription_task_instance_status(
instance_record, pipeline_parser, need_detail=True
)
return instance_status

instance_status_list = task_tools.TaskResultTools.list_subscription_task_instance_status(
instance_records=[instance_record], need_detail=True
)
Expand Down
8 changes: 5 additions & 3 deletions apps/backend/subscription/steps/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,15 +594,17 @@ def handle_check_and_skip_instances(
:param instances:
:return:
"""
host_id__instance_id_map: Dict[int, str] = {}
gse_version__host_info_list_map: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
host_id__instance_id_map: Dict[int, Dict] = {}
gse_version__bk_host_id__host_map: Dict[str, Dict[int, models.Host]] = defaultdict(dict)

for instance_id, instance in instances.items():
bk_host_id = instance["host"].get("bk_host_id")
# dict in 是近似 O(1) 的操作复杂度:https://stackoverflow.com/questions/17539367/
if bk_host_id not in bk_host_id__host_map:
# 忽略未同步主机
continue

host_obj = bk_host_id__host_map[bk_host_id]
gse_version__host_info_list_map[instance["meta"]["GSE_VERSION"]].append(
{
Expand All @@ -627,11 +629,11 @@ def handle_check_and_skip_instances(
extra_meta_data={},
)
)

logger.info(f"agent_id__readable_proc_status_map -> {agent_id__readable_proc_status_map}")

for gse_version, _bk_host_id__host_map in gse_version__bk_host_id__host_map.items():
gse_api_helper = get_gse_api_helper(gse_version)

for bk_host_id, host_obj in _bk_host_id__host_map.items():
agent_id: str = gse_api_helper.get_agent_id(host_obj)
instance_id: str = host_id__instance_id_map[bk_host_id]
Expand All @@ -650,7 +652,6 @@ def handle_check_and_skip_instances(
"current_version": proc_status["version"],
"target_version": self.get_matching_pkg_real_version(host_obj.os_type, host_obj.cpu_arch),
}

if base_reason_info["status"] != constants.ProcStateType.RUNNING:
# 插件状态异常时进行重装
instance_actions[host_id__instance_id_map[bk_host_id]] = install_action
Expand Down Expand Up @@ -678,6 +679,7 @@ def filter_related_process_statuses(self, auto_trigger: bool) -> QuerySet:
"""
if self.subscription_step.subscription_id in [None, -1]:
return models.ProcessStatus.objects.none()

# 默认仅查出需要使用的非 JSON 字段
# 此处不考虑用 only,only 隐式仅加载某些字段,容易在引用对象处造成 n+1 查询
statuses = models.ProcessStatus.objects.filter(
Expand Down
Loading

0 comments on commit 364ab82

Please sign in to comment.