Skip to content

Commit

Permalink
fix: 解决给主机添加process时并发请求数量过大 (closed #2223)
Browse files Browse the repository at this point in the history
  • Loading branch information
ping15 authored and wyyalt committed Jun 17, 2024
1 parent 550677e commit 45e4ab8
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,6 @@ def handle_update_cmdb_hosts_case(
sub_inst.update_time = timezone.now()
host_info: Dict[str, Any] = sub_inst.instance_info["host"]
properties: Dict[str, Any] = {
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_addressing": host_info.get("bk_addressing", constants.CmdbAddressingType.STATIC.value),
"bk_host_innerip": host_info.get("bk_host_innerip", "")
if host_info["bk_host_id"] not in host_ids_with_mutil_inner_ip
else "",
Expand Down
16 changes: 16 additions & 0 deletions apps/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,19 @@ def needs_batch_request(self) -> bool:
return False

return True


class CMDBInstanceChoices(enum.EnhanceEnum):
HOST = "cmdb_instance.host"
SCOPE = "cmdb_instance.scope"
PROCESS = "cmdb_instance.process"
SERVICE = "cmdb_instance.service"

@classmethod
def _get_member__alias_map(cls) -> Dict[Enum, str]:
return {
cls.HOST: _("cmdb_instance.host参数"),
cls.SCOPE: _("cmdb_instance.scope参数"),
cls.PROCESS: _("cmdb_instance.process参数"),
cls.SERVICE: _("cmdb_instance.service参数"),
}
4 changes: 3 additions & 1 deletion apps/backend/periodic_tasks/cache_scope_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def get_instances_by_scope_task(subscription_id):
f" scope_md5: {scope_md5}, scope: {subscription.scope}"
)
# 查询后会进行缓存,详见 get_instances_by_scope 的装饰器 func_cache_decorator
tools.get_instances_by_scope(subscription.scope, source="get_instances_by_scope_task")
tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, source="get_instances_by_scope_task"
)
logger.info(f"[cache_subscription_scope_instances] (subscription: {subscription_id}) end.")


Expand Down
12 changes: 8 additions & 4 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ def task_result(
# 如果不需要已不在订阅范围内的执行快照,查询订阅范围过滤掉移除的实例 ID
subscription = models.Subscription.objects.get(id=self.subscription_id)
scope_instance_id_list: Set[str] = set(
tools.get_instances_by_scope(subscription.scope, get_cache=True, source="task_result").keys()
tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, get_cache=True, source="task_result"
).keys()
)
base_kwargs["instance_id__in"] = scope_instance_id_list

Expand Down Expand Up @@ -506,7 +508,9 @@ def statistic(subscription_id_list: List[int]) -> List[Dict]:
sub_statistic_list: List[Dict] = []
for subscription in subscriptions:
sub_statistic = {"subscription_id": subscription.id, "status": []}
current_instances = tools.get_instances_by_scope(subscription.scope, get_cache=True, source="statistic")
current_instances = tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, get_cache=True, source="statistic"
)

status_statistic = {"SUCCESS": 0, "PENDING": 0, "FAILED": 0, "RUNNING": 0}
plugin_versions = defaultdict(lambda: defaultdict(int))
Expand Down Expand Up @@ -618,8 +622,8 @@ def instance_status(subscription_id_list: List[int], show_task_detail: bool) ->
result = []
for subscription in subscriptions:
subscription_result = []
current_instances = tools.get_instances_by_scope(
subscription.scope, get_cache=True, source="instance_status"
current_instances = tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, get_cache=True, source="instance_status"
)

# 对于每个instance,通过group_id找到其对应的host_status
Expand Down
11 changes: 8 additions & 3 deletions apps/backend/subscription/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,13 @@ def run_subscription_task_and_create_instance(
scope["bk_biz_id"] = subscription.bk_biz_id

# 获取订阅范围内全部实例
instances = tools.get_instances_by_scope(scope, source="run_subscription_task_and_create_instance")
steps = subscription.steps
instances = tools.get_instances_by_scope_with_checker(
scope, steps, source="run_subscription_task_and_create_instance"
)
logger.info(
"[sub_lifecycle<sub(%s), task(%s)>][run_subscription_task_and_create_instance] get_instances_by_scope -> %s",
"[sub_lifecycle<sub(%s), task(%s)>][run_subscription_task_and_create_instance] "
"get_instances_by_scope_with_checker -> %s",
subscription.id,
subscription_task.id,
len(instances),
Expand Down Expand Up @@ -698,13 +702,14 @@ def run_subscription_task_and_create_instance(
subscription_task.id,
deleted_id_not_in_scope,
)
deleted_instance_info = tools.get_instances_by_scope(
deleted_instance_info = tools.get_instances_by_scope_with_checker(
{
"bk_biz_id": subscription.bk_biz_id,
"object_type": subscription.object_type,
"node_type": models.Subscription.NodeType.INSTANCE,
"nodes": deleted_id_not_in_scope,
},
steps,
source="find_deleted_instances",
)

Expand Down
54 changes: 48 additions & 6 deletions apps/backend/subscription/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import copy
import hashlib
import ipaddress
import json
import logging
import math
import os
Expand All @@ -27,7 +28,7 @@
from django.utils import timezone

from apps.backend.components.collections import core
from apps.backend.constants import FilterFieldName, InstNodeType
from apps.backend.constants import CMDBInstanceChoices, FilterFieldName, InstNodeType
from apps.backend.subscription import task_tools
from apps.backend.subscription.commons import get_host_by_inst, list_biz_hosts
from apps.backend.subscription.constants import SUBSCRIPTION_SCOPE_CACHE_TIME
Expand Down Expand Up @@ -305,7 +306,13 @@ def get_service_instances(
filter_field_name.value: filter_id_list,
}
if filter_field_name.needs_batch_request:
result = batch_request(CCApi.list_service_instance_detail, params, sort="id")
result = batch_request(
CCApi.list_service_instance_detail,
params,
sort="id",
limit=constants.LIST_SERVICE_INSTANCE_DETAIL_LIMIT,
interval=constants.LIST_SERVICE_INSTANCE_DETAIL_INTERVAL,
)
else:
params["page"] = {
"start": 0,
Expand Down Expand Up @@ -390,14 +397,23 @@ def get_service_instance_by_inst(bk_biz_id, inst_list, module_to_topo):
"bk_module_id": bk_module_id,
},
"sort": "id",
"limit": constants.LIST_SERVICE_INSTANCE_DETAIL_LIMIT,
}
for bk_module_id in module_ids
]

service_instances = batch_call(batch_request, params, extend_result=True)
service_instances = batch_call(
batch_request, params, extend_result=True, interval=constants.LIST_SERVICE_INSTANCE_DETAIL_INTERVAL
)
else:
params = {"bk_biz_id": int(bk_biz_id), "with_name": True}
service_instances = batch_request(CCApi.list_service_instance_detail, params, sort="id")
service_instances = batch_request(
CCApi.list_service_instance_detail,
params,
sort="id",
limit=constants.LIST_SERVICE_INSTANCE_DETAIL_LIMIT,
interval=constants.LIST_SERVICE_INSTANCE_DETAIL_INTERVAL,
)

service_instances = [
service_instance for service_instance in service_instances if service_instance["bk_module_id"] in module_ids
Expand Down Expand Up @@ -494,7 +510,13 @@ def get_service_instances_by_template(bk_obj_id, template_info_list: list, bk_bi
params = dict(bk_set_template_ids=template_ids, bk_biz_id=int(bk_biz_id), fields=("bk_host_id", "bk_cloud_id"))
host_info_result = batch_request(call_func, params)
bk_host_ids = [inst["bk_host_id"] for inst in host_info_result]
all_service_instances = batch_request(CCApi.list_service_instance_detail, params, sort="id")
all_service_instances = batch_request(
CCApi.list_service_instance_detail,
params,
sort="id",
limit=constants.LIST_SERVICE_INSTANCE_DETAIL_LIMIT,
interval=constants.LIST_SERVICE_INSTANCE_DETAIL_INTERVAL,
)
service_instances = [instance for instance in all_service_instances if instance["bk_host_id"] in bk_host_ids]

return service_instances
Expand Down Expand Up @@ -764,6 +786,7 @@ def wrapper(scope: Dict[str, Union[Dict, Any]], *args, **kwargs) -> Dict[str, Di
"node_type": scope["node_type"],
"nodes": list(nodes),
"instance_selector": scope.get("instance_selector"),
"with_info": scope["with_info"],
},
**kwargs,
}
Expand Down Expand Up @@ -796,6 +819,23 @@ def get_scope_labels_func(
}


def get_instances_by_scope_with_checker(
scope: Dict[str, Union[Dict, int, Any]], steps: List[models.SubscriptionStep], *args, **kwargs
) -> Dict[str, Dict[str, Union[Dict, Any]]]:

if "with_info" in scope:
scope["with_info"]["process"] = False
else:
scope["with_info"] = {"process": False}

for step in steps:
if CMDBInstanceChoices.PROCESS.value in json.dumps(step.params):
scope["with_info"]["process"] = True
break

return get_instances_by_scope(scope, *args, **kwargs)


@support_multi_biz
@SetupObserve(histogram=metrics.app_task_get_instances_by_scope_duration_seconds, get_labels_func=get_scope_labels_func)
@FuncCacheDecorator(cache_time=SUBSCRIPTION_SCOPE_CACHE_TIME)
Expand Down Expand Up @@ -924,7 +964,9 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str,

add_host_info_to_instances(bk_biz_id, scope, instances)
add_scope_info_to_instances(nodes, scope, instances, module_to_topo)
add_process_info_to_instances(bk_biz_id, scope, instances)

if scope["with_info"]["process"]:
add_process_info_to_instances(bk_biz_id, scope, instances)

instances_dict = {}
data = {
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/tests/subscription/test_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class TestPerformance(TestCase):

@classmethod
def mock_get_instance_by_scope(cls, instance_num):
def get_instances_by_scope(scope):
def get_instances_by_scope(scope, *args, **kwargs):
if scope["nodes"]:
mocked_instances = {}
for i in range(instance_num):
Expand Down
Loading

0 comments on commit 45e4ab8

Please sign in to comment.