diff --git a/apps/adapters/__init__.py b/apps/adapters/__init__.py new file mode 100644 index 000000000..29ed269e0 --- /dev/null +++ b/apps/adapters/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/apps/adapters/api/__init__.py b/apps/adapters/api/__init__.py new file mode 100644 index 000000000..29ed269e0 --- /dev/null +++ b/apps/adapters/api/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/apps/adapters/api/gse/__init__.py b/apps/adapters/api/gse/__init__.py new file mode 100644 index 000000000..c2b3866d6 --- /dev/null +++ b/apps/adapters/api/gse/__init__.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import typing + +from django.conf import settings + +from common.api import GseApi +from common.log import logger +from env import constants + +from .base import GseApiBaseHelper +from .v1 import GseV1ApiHelper +from .v2 import GseV2ApiHelper + +GSE_HELPERS: typing.Dict[str, typing.Type[GseApiBaseHelper]] = { + constants.GseVersion.V1.value: GseV1ApiHelper, + constants.GseVersion.V2.value: GseV2ApiHelper, +} + + +def get_gse_api_helper(gse_version: str) -> typing.Type[GseApiBaseHelper]: + if gse_version not in GSE_HELPERS: + logger.warning( + f"Get GseApiHelper failed: " + f"unsupported gse_version -> {gse_version}, options -> {GSE_HELPERS.values()}; " + f"use default -> {constants.GseVersion.V1.value}" + ) + gse_version = constants.GseVersion.V1.value + return GSE_HELPERS[gse_version] + + +GseApiHelper: GseApiBaseHelper = get_gse_api_helper(settings.GSE_VERSION)( + version=settings.GSE_VERSION, gse_api_obj=GseApi +) diff --git a/apps/adapters/api/gse/base.py b/apps/adapters/api/gse/base.py new file mode 100644 index 000000000..a04ba8344 --- /dev/null +++ b/apps/adapters/api/gse/base.py @@ -0,0 +1,200 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import abc +import typing +from collections import ChainMap + +from apps.core.concurrent import controller +from apps.node_man import constants, models +from apps.utils import concurrent +from common.api import GseApi + +InfoDict = typing.Dict[str, typing.Any] +InfoDictList = typing.List[InfoDict] +AgentIdInfoMap = typing.Dict[str, InfoDict] + + +class GseApiBaseHelper(abc.ABC): + + version: str = None + gse_api_obj = None + + def __init__(self, version: str, gse_api_obj=GseApi): + self.version = version + self.gse_api_obj = gse_api_obj + + @abc.abstractmethod + def get_agent_id(self, mixed_types_of_host_info: typing.Union[InfoDict, models.Host]) -> str: + """ + 获取 Agent 唯一标识 + :param mixed_types_of_host_info: 携带主机信息的混合类型对象 + :return: + """ + raise NotImplementedError + + @abc.abstractmethod + def _list_proc_state( + self, + namespace: str, + proc_name: str, + labels: InfoDict, + host_info_list: InfoDictList, + extra_meta_data: InfoDict, + **options, + ) -> AgentIdInfoMap: + """ + 获取进程状态信息 + :param namespace: 命名空间 + :param proc_name: 进程名称 + :param labels: 标签 + :param host_info_list: 主机信息列表 + :param extra_meta_data: 额外的元数据 + :param options: 其他可能需要的参数 + :return: + """ + raise NotImplementedError + + @abc.abstractmethod + def _list_agent_state(self, host_info_list: InfoDictList) -> AgentIdInfoMap: + """ + 获取 Agent 状态信息 + :param host_info_list: AgentId - Agent 状态信息映射关系 + :return: + """ + raise NotImplementedError + + @abc.abstractmethod + def preprocessing_proc_operate_info(self, host_info_list: InfoDictList, proc_operate_info: InfoDict) -> InfoDict: + """ + 进程操作信息预处理 + :param host_info_list: 主机信息列表 + :param proc_operate_info: 进程操作信息 + :return: + """ + raise NotImplementedError + + @abc.abstractmethod + def _operate_proc_multi(self, proc_operate_req: InfoDictList, **options) -> str: + """ + 批量进程操作 + :param proc_operate_req: 进程操作信息列表 + :param options: 其他可能需要的参数 + :return: + """ + raise NotImplementedError + + @abc.abstractmethod + def get_proc_operate_result(self, task_id: str) -> InfoDict: + """ + 获取进程操作结果 + :param task_id: GSE 任务 ID + :return: + """ + raise NotImplementedError + + @staticmethod + def get_version(version_str: typing.Optional[str]) -> str: + version_match: typing.Optional[typing.Match] = constants.VERSION_PATTERN.search(version_str or "") + return version_match.group() if version_match else "" + + def get_gse_proc_key( + self, mixed_types_of_host_info: typing.Union[InfoDict, models.Host], namespace: str, proc_name: str, **options + ) -> str: + """ + 获取进程唯一标识 + :param mixed_types_of_host_info: 携带主机信息的混合类型对象 + :param namespace: 命名空间 + :param proc_name: 进程名称 + :param options: 其他可能需要的关键字参数 + :return: + """ + return f"{self.get_agent_id(mixed_types_of_host_info)}:{namespace}:{proc_name}" + + def list_proc_state( + self, + namespace: str, + proc_name: str, + labels: InfoDict, + host_info_list: InfoDictList, + extra_meta_data: InfoDict, + **options, + ) -> AgentIdInfoMap: + """ + 获取进程状态信息 + :param namespace: 命名空间 + :param proc_name: 进程名称 + :param labels: 标签 + :param host_info_list: 主机信息列表 + :param extra_meta_data: 额外的元数据 + :param options: 其他可能需要的参数 + :return: + """ + + @controller.ConcurrentController( + data_list_name="_host_info_list", + batch_call_func=concurrent.batch_call, + extend_result=False, + get_config_dict_func=lambda: {"limit": constants.QUERY_PROC_STATUS_HOST_LENS}, + ) + def get_proc_status_inner( + _namespace: str, + _proc_name: str, + _labels: InfoDict, + _host_info_list: InfoDictList, + _extra_meta_data: InfoDict, + **_options, + ) -> typing.List[AgentIdInfoMap]: + return self._list_proc_state(_namespace, _proc_name, _labels, _host_info_list, _extra_meta_data, **_options) + + agent_id__proc_status_info_map_list: typing.List[AgentIdInfoMap] = get_proc_status_inner( + _namespace=namespace, + _proc_name=proc_name, + _labels=labels, + _host_info_list=host_info_list, + _extra_meta_data=extra_meta_data, + **options, + ) + return dict(ChainMap(*agent_id__proc_status_info_map_list)) + + def list_agent_state(self, host_info_list: InfoDictList) -> AgentIdInfoMap: + """ + 获取 Agent 状态信息 + 版本 / 状态 + :param host_info_list: AgentId - Agent 状态信息映射关系 + :return: + """ + + @controller.ConcurrentController( + data_list_name="_host_info_list", + batch_call_func=concurrent.batch_call, + extend_result=False, + get_config_dict_func=lambda: {"limit": constants.QUERY_AGENT_STATUS_HOST_LENS}, + ) + def list_agent_state_inner(_host_info_list: InfoDictList) -> typing.List[AgentIdInfoMap]: + return self._list_agent_state(_host_info_list) + + agent_id__agent_state_info_map_list: typing.List[AgentIdInfoMap] = list_agent_state_inner( + _host_info_list=host_info_list + ) + return dict(ChainMap(*agent_id__agent_state_info_map_list)) + + def operate_proc_multi(self, proc_operate_req: InfoDictList, **options) -> str: + """ + 批量进程操作 + :param proc_operate_req: 进程操作信息列表 + :param options: 其他可能需要的参数 + :return: + """ + preprocessed_proc_operate_req: InfoDictList = [] + for proc_operate_info in proc_operate_req: + hosts = proc_operate_info.pop("hosts") + preprocessed_proc_operate_req.append(self.preprocessing_proc_operate_info(hosts, proc_operate_info)) + return self._operate_proc_multi(preprocessed_proc_operate_req, **options) diff --git a/apps/adapters/api/gse/v1.py b/apps/adapters/api/gse/v1.py new file mode 100644 index 000000000..c02bd7197 --- /dev/null +++ b/apps/adapters/api/gse/v1.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import typing +from collections import Mapping + +from apps.node_man import constants, models + +from . import base + + +class GseV1ApiHelper(base.GseApiBaseHelper): + def get_agent_id(self, mixed_types_of_host_info: typing.Union[base.InfoDict, models.Host]) -> str: + if isinstance(mixed_types_of_host_info, Mapping): + if "host" in mixed_types_of_host_info: + return self.get_agent_id(mixed_types_of_host_info["host"]) + bk_cloud_id: int = mixed_types_of_host_info["bk_cloud_id"] + for attr_name in ["ip", "inner_ip", "bk_host_innerip"]: + ip = mixed_types_of_host_info.get(attr_name, "") + if ip: + break + if not ip: + raise ValueError("can not get ip in dict type host info") + else: + bk_cloud_id: typing.Optional[int] = getattr(mixed_types_of_host_info, "bk_cloud_id") + if bk_cloud_id is None: + raise ValueError("can not get bk_cloud_id in obj type host info") + for attr_name in ["inner_ip"]: + ip = getattr(mixed_types_of_host_info, attr_name, "") + if ip: + break + if not ip: + raise ValueError("can not get ip in obj type host info") + + return f"{bk_cloud_id}:{ip}" + + def _list_proc_state( + self, + namespace: str, + proc_name: str, + labels: base.InfoDict, + host_info_list: base.InfoDictList, + extra_meta_data: base.InfoDict, + **options, + ) -> base.AgentIdInfoMap: + hosts: base.InfoDictList = [ + {"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} for host_info in host_info_list + ] + query_params: base.InfoDict = { + "meta": {"namespace": constants.GSE_NAMESPACE, "name": proc_name, "labels": {"proc_name": proc_name}}, + "hosts": hosts, + } + # 接口测试 + # 200: 0.3s-0.4s (0.41s 0.29s 0.33s) + # 500: 0.35s-0.5s (0.34s 0.42s 0.51s) + # 1000: 0.5s-0.6s (0.53s 0.56s 0.61s) + # 2000: 0.9s (0.91s, 0.93s) + # 5000: 2s-4s (2.3s 2.2s 4.1s 4.3s) + proc_infos: base.InfoDictList = self.gse_api_obj.get_proc_status(query_params).get("proc_infos") or [] + agent_id__proc_info_map: base.AgentIdInfoMap = {} + for proc_info in proc_infos: + agent_id__proc_info_map[self.get_agent_id(proc_info)] = { + "version": self.get_version(proc_info.get("version")), + "status": constants.PLUGIN_STATUS_DICT[proc_info["status"]], + "is_auto": constants.AutoStateType.AUTO if proc_info["isauto"] else constants.AutoStateType.UNAUTO, + "name": proc_info["meta"]["name"], + } + return agent_id__proc_info_map + + def _list_agent_state(self, host_info_list: base.InfoDictList) -> base.AgentIdInfoMap: + hosts: base.InfoDictList = [ + {"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} for host_info in host_info_list + ] + agent_id__info_map: base.AgentIdInfoMap = self.gse_api_obj.get_agent_info({"hosts": hosts}) + agent_id__status_map: base.AgentIdInfoMap = self.gse_api_obj.get_agent_status({"hosts": hosts}) + + for agent_id, agent_info in agent_id__info_map.items(): + agent_info.update(agent_id__status_map.get(agent_id) or {}) + agent_info["version"] = self.get_version(agent_info.get("version")) + agent_info["bk_agent_alive"] = agent_info.get("bk_agent_alive") or constants.BkAgentStatus.NOT_ALIVE.value + + return agent_id__info_map + + def preprocessing_proc_operate_info( + self, host_info_list: base.InfoDictList, proc_operate_info: base.InfoDict + ) -> base.InfoDict: + proc_operate_info["hosts"] = [ + {"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} for host_info in host_info_list + ] + return proc_operate_info + + def _operate_proc_multi(self, proc_operate_req: base.InfoDictList, **options) -> str: + return self.gse_api_obj.operate_proc_multi({"proc_operate_req": proc_operate_req})["task_id"] + + def get_proc_operate_result(self, task_id: str) -> base.InfoDict: + return self.gse_api_obj.get_proc_operate_result({"task_id": task_id}, raw=True) diff --git a/apps/adapters/api/gse/v2.py b/apps/adapters/api/gse/v2.py new file mode 100644 index 000000000..396300e0a --- /dev/null +++ b/apps/adapters/api/gse/v2.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import typing +from collections import Mapping + +from apps.node_man import constants, models + +from . import base +from .v1 import GseV1ApiHelper + + +class GseV2ApiHelper(GseV1ApiHelper): + def get_agent_id(self, mixed_types_of_host_info: typing.Union[base.InfoDict, models.Host]) -> str: + if isinstance(mixed_types_of_host_info, Mapping): + if "host" in mixed_types_of_host_info: + return self.get_agent_id(mixed_types_of_host_info["host"]) + bk_agent_id: typing.Optional[str] = mixed_types_of_host_info.get("bk_agent_id") + else: + bk_agent_id: typing.Optional[int] = getattr(mixed_types_of_host_info, "bk_agent_id") + if not bk_agent_id: + return super(GseV2ApiHelper, self).get_agent_id(mixed_types_of_host_info) + return bk_agent_id + + def _list_proc_state( + self, + namespace: str, + proc_name: str, + labels: base.InfoDict, + host_info_list: base.InfoDictList, + extra_meta_data: base.InfoDict, + **options + ) -> base.AgentIdInfoMap: + agent_id_list: typing.List[str] = [self.get_agent_id(host_info) for host_info in host_info_list] + query_params: base.InfoDict = { + "meta": {"namespace": constants.GSE_NAMESPACE, "name": proc_name, "labels": {"proc_name": proc_name}}, + "agent_id_list": agent_id_list, + } + proc_infos: base.InfoDictList = ( + self.gse_api_obj.v2_proc_get_proc_status_v2(query_params).get("proc_infos") or [] + ) + agent_id__proc_info_map: base.AgentIdInfoMap = {} + for proc_info in proc_infos: + proc_info["host"]["bk_agent_id"] = proc_info["bk_agent_id"] + agent_id__proc_info_map[self.get_agent_id(proc_info)] = { + "version": self.get_version(proc_info.get("version")), + "status": constants.PLUGIN_STATUS_DICT[proc_info["status"]], + "is_auto": constants.AutoStateType.AUTO if proc_info["isauto"] else constants.AutoStateType.UNAUTO, + "name": proc_info["meta"]["name"], + } + return agent_id__proc_info_map + + def _list_agent_state(self, host_info_list: base.InfoDictList) -> base.AgentIdInfoMap: + agent_id_list: typing.List[str] = [self.get_agent_id(host_info) for host_info in host_info_list] + agent_state_list: base.InfoDictList = self.gse_api_obj.v2_cluster_list_agent_state( + {"agent_id_list": agent_id_list} + ) + agent_id__state_map: base.AgentIdInfoMap = {} + for agent_state in agent_state_list: + status_code: typing.Optional[int] = agent_state.pop("status_code", None) + if status_code == constants.GseAgentStatusCode.RUNNING.value: + agent_state["bk_agent_alive"] = constants.BkAgentStatus.ALIVE.value + else: + agent_state["bk_agent_alive"] = constants.BkAgentStatus.NOT_ALIVE.value + agent_id__state_map[agent_state["bk_agent_id"]] = agent_state + return agent_id__state_map + + def preprocessing_proc_operate_info( + self, host_info_list: base.InfoDictList, proc_operate_info: base.InfoDict + ) -> base.InfoDict: + proc_operate_info["agent_id_list"] = [self.get_agent_id(host_info) for host_info in host_info_list] + return proc_operate_info + + def _operate_proc_multi(self, proc_operate_req: base.InfoDictList, **options) -> str: + return self.gse_api_obj.v2_proc_operate_proc_multi({"proc_operate_req": proc_operate_req})["task_id"] + + def get_proc_operate_result(self, task_id: str) -> base.InfoDict: + return self.gse_api_obj.v2_proc_get_proc_operate_result_v2({"task_id": task_id}, raw=True) diff --git a/apps/backend/agent/manager.py b/apps/backend/agent/manager.py index a38bd674a..287f55052 100644 --- a/apps/backend/agent/manager.py +++ b/apps/backend/agent/manager.py @@ -185,20 +185,6 @@ def start_nginx(cls): act.component.inputs.script_param = Var(type=Var.PLAIN, value="") return act - @classmethod - def delegate_plugin(cls, plugin_name: str): - """ - 托管插件 - :param plugin_name: 插件名称 - :return: - """ - act = AgentServiceActivity( - component_code=components.DelegatePluginProcComponent.code, - name=_("托管 {plugin_name} 插件进程").format(plugin_name=plugin_name), - ) - act.component.inputs.plugin_name = Var(type=Var.PLAIN, value=plugin_name) - return act - @classmethod def render_and_push_gse_config(cls, name=components.RenderAndPushGseConfigComponent.name): """渲染并下载 agent 配置""" diff --git a/apps/backend/components/collections/agent_new/components.py b/apps/backend/components/collections/agent_new/components.py index 922eaaf6e..9334acfea 100644 --- a/apps/backend/components/collections/agent_new/components.py +++ b/apps/backend/components/collections/agent_new/components.py @@ -18,7 +18,6 @@ from .check_policy_gse_to_proxy import CheckPolicyGseToProxyService from .choose_access_point import ChooseAccessPointService from .configure_policy import ConfigurePolicyService -from .delegate_plugin_proc import DelegatePluginProcService from .get_agent_id import GetAgentIDService from .get_agent_status import GetAgentStatusService from .install import InstallService @@ -72,12 +71,6 @@ class ConfigurePolicyComponent(Component): bound_service = ConfigurePolicyService -class DelegatePluginProcComponent(Component): - name = _("托管插件进程") - code = "delegate_plugin_proc" - bound_service = DelegatePluginProcService - - class GetAgentStatusComponent(Component): name = _("查询Agent状态") code = "get_agent_status" diff --git a/apps/backend/components/collections/agent_new/delegate_plugin_proc.py b/apps/backend/components/collections/agent_new/delegate_plugin_proc.py deleted file mode 100644 index bdf38c75b..000000000 --- a/apps/backend/components/collections/agent_new/delegate_plugin_proc.py +++ /dev/null @@ -1,295 +0,0 @@ -# -*- coding: utf-8 -*- -""" -TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. -Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. -Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. -You may obtain a copy of the License at https://opensource.org/licenses/MIT -Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -specific language governing permissions and limitations under the License. -""" -import json -import ntpath -import posixpath -from collections import defaultdict -from typing import Any, Dict, List, Set, Union - -from django.db.models import Max -from django.utils.translation import ugettext_lazy as _ - -from apps.backend.api.constants import ( - GSE_RUNNING_TASK_CODE, - POLLING_INTERVAL, - POLLING_TIMEOUT, - GseDataErrCode, -) -from apps.node_man import constants, models -from apps.utils import concurrent -from common.api import GseApi -from pipeline.core.flow import Service, StaticIntervalGenerator - -from .base import AgentBaseService, AgentCommonData - - -class DelegatePluginProcService(AgentBaseService): - - __need_schedule__ = True - interval = StaticIntervalGenerator(POLLING_INTERVAL) - - def inputs_format(self): - return super().inputs_format() + [ - Service.InputItem(name="plugin_name", key="plugin_name", type="str", required=True) - ] - - def outputs_format(self): - return super().outputs_format() + [ - Service.InputItem(name="task_id", key="task_id", type="str", required=True), - Service.InputItem(name="proc_name", key="proc_name", type="str", required=True), - Service.InputItem(name="polling_time", key="polling_time", type="int", required=True), - ] - - def update_proc_infos( - self, sub_insts: List[models.SubscriptionInstanceRecord], host_objs: List[models.Host], package: models.Packages - ) -> List[int]: - """ - 更新插件进程信息 - :param sub_insts: 订阅实例列表 - :param host_objs: 主机对象列表 - :param package: 插件包 - :return: 成功更新进程信息的主机ID列表 - """ - proc_name = package.project - proc_control: models.ProcControl = package.proc_control - if package.os == constants.PluginOsType.windows: - path_handler = ntpath - else: - path_handler = posixpath - setup_path = path_handler.join( - package.proc_control.install_path, constants.PluginChildDir.OFFICIAL.value, "bin" - ) - pid_path = package.proc_control.pid_path - update_proc_info_params = { - "meta": {"labels": {"proc_name": proc_name}, "namespace": constants.GSE_NAMESPACE, "name": proc_name}, - "spec": { - "control": { - "start_cmd": proc_control.start_cmd, - "stop_cmd": proc_control.stop_cmd, - "restart_cmd": proc_control.restart_cmd, - "reload_cmd": proc_control.reload_cmd or proc_control.restart_cmd, - "kill_cmd": proc_control.kill_cmd, - "version_cmd": proc_control.version_cmd, - "health_cmd": proc_control.health_cmd, - }, - "monitor_policy": {"auto_type": 1}, - "resource": {"mem": 10, "cpu": 10}, - "identity": { - "user": constants.ACCOUNT_MAP.get(package.os, "root"), - "proc_name": proc_name, - "setup_path": setup_path, - "pid_path": pid_path, - }, - }, - } - - sub_inst_ids: List[int] = [sub_inst.id for sub_inst in sub_insts] - self.log_info( - sub_inst_ids=sub_inst_ids, - log_content=_( - "更新 {proc_name} 进程信息:call GSE api -> update_proc_info with params: \n {query_params})" - ).format(proc_name=proc_name, query_params=json.dumps(update_proc_info_params, indent=2)), - ) - update_proc_info_params["hosts"] = [ - { - "ip": host_obj.inner_ip, - "bk_cloud_id": host_obj.bk_cloud_id, - "bk_supplier_id": constants.DEFAULT_SUPPLIER_ID, - } - for host_obj in host_objs - ] - - gse_proc_key__proc_result_map: Dict[str, Dict[str, Any]] = GseApi.update_proc_info(update_proc_info_params) - - success_host_ids: List[int] = [] - success_sub_inst_ids: List[int] = [] - host_id__sub_inst_id_map: Dict[int, int] = { - sub_inst.instance_info["host"]["bk_host_id"]: sub_inst.id for sub_inst in sub_insts - } - for host_obj in host_objs: - sub_inst_id = host_id__sub_inst_id_map[host_obj.bk_host_id] - gse_proc_key = ( - f"{host_obj.bk_cloud_id}:{constants.DEFAULT_SUPPLIER_ID}:{host_obj.inner_ip}:" - f"{constants.GSE_NAMESPACE}:{proc_name}" - ) - proc_result = gse_proc_key__proc_result_map.get(gse_proc_key) - if not proc_result: - self.move_insts_to_failed( - [sub_inst_id], - log_content=_("未能查询到进程更新结果,gse_proc_key -> {gse_proc_key}".format(gse_proc_key=gse_proc_key)), - ) - continue - - error_code = proc_result.get("error_code") - if error_code != GseDataErrCode.SUCCESS: - self.move_insts_to_failed( - [sub_inst_id], - log_content=_( - "GSE 更新 {proc_name} 进程信息失败,gse_proc_key -> {gse_proc_key}, " - "error_code -> {error_code}, error_msg -> {error_msg}" - ).format( - proc_name=proc_name, - gse_proc_key=gse_proc_key, - error_code=error_code, - error_msg=proc_result.get("error_msg"), - ), - ) - continue - - success_host_ids.append(host_obj.bk_host_id) - success_sub_inst_ids.append(host_id__sub_inst_id_map[host_obj.bk_host_id]) - - self.log_info( - sub_inst_ids=success_sub_inst_ids, log_content=_("已成功更新 {proc_name} 进程信息").format(proc_name=proc_name) - ) - return success_host_ids - - def _execute(self, data, parent_data, common_data: AgentCommonData): - host_id_obj_map = common_data.host_id_obj_map - plugin_name = data.get_one_of_inputs("plugin_name") - latest_pkg_ids_of_each_os: List[int] = ( - models.Packages.objects.filter(project=plugin_name, cpu_arch=constants.CpuType.x86_64) - .values("os") - .annotate(id=Max("id")) - .values_list("id", flat=True) - ) - packages = models.Packages.objects.filter(id__in=latest_pkg_ids_of_each_os) - os__package_map: Dict[str, models.Packages] = {package.os: package for package in packages} - - skipped_host_ids: Set[int] = set() - bk_host_id__sub_inst_id_map: Dict[int, int] = {} - # 按操作系统类型对订阅实例进行聚合,仅取匹配插件包成功的实例 - host_objs_gby_host_os: Dict[str, List[models.Host]] = defaultdict(list) - sub_insts_gby_host_os: Dict[str, List[models.SubscriptionInstanceRecord]] = defaultdict(list) - for sub_inst in common_data.subscription_instances: - host_obj: models.Host = host_id_obj_map[sub_inst.instance_info["host"]["bk_host_id"]] - # 处理插件包不适配的场景 - if host_obj.os_type.lower() not in os__package_map: - skipped_host_ids.add(host_obj.bk_host_id) - # 考虑这种情况较少且为了简化逻辑,暂不聚合相同日志内容再打印 - self.log_warning( - [sub_inst.id], - log_content=_("「{plugin_name}」不支持操作系统 -> {os_type},跳过该步骤").format( - plugin_name=plugin_name, os_type=host_obj.os_type - ), - ) - continue - - bk_host_id__sub_inst_id_map[host_obj.bk_host_id] = sub_inst.id - sub_insts_gby_host_os[host_obj.os_type].append(sub_inst) - host_objs_gby_host_os[host_obj.os_type].append(host_obj) - - # 根据操作系统,多线程请求 update_proc_infos - update_proc_infos_params_list: List[Dict] = [] - for os_upper, sub_insts in sub_insts_gby_host_os.items(): - package = os__package_map[os_upper.lower()] - update_proc_infos_params_list.append( - {"sub_insts": sub_insts, "host_objs": host_objs_gby_host_os[os_upper], "package": package} - ) - - # 并发请求GSE,更新进程信息 - host_ids_to_be_delegate_proc = concurrent.batch_call( - func=self.update_proc_infos, - params_list=update_proc_infos_params_list, - get_data=lambda x: x, - extend_result=True, - ) - - # 没有需要托管进程的主机,直接结束调度并返回 - if not host_ids_to_be_delegate_proc: - self.finish_schedule() - return - - # 构造需要托管进程的主机信息 - sub_insts_ids_to_be_delegate_proc: List[int] = [] - hosts_need_delegate_proc: List[Dict[str, Union[str, int]]] = [] - for bk_host_id in host_ids_to_be_delegate_proc: - host_obj = host_id_obj_map[bk_host_id] - sub_insts_ids_to_be_delegate_proc.append(bk_host_id__sub_inst_id_map[host_obj.bk_host_id]) - hosts_need_delegate_proc.append({"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id}) - - # 构造托管进程参数 - delegate_proc_base_params = { - "meta": {"labels": {"proc_name": plugin_name}, "namespace": constants.GSE_NAMESPACE, "name": plugin_name}, - "op_type": constants.GseOpType.DELEGATE, - } - self.log_info( - sub_insts_ids_to_be_delegate_proc, - log_content=( - _("托管 {proc_name} 进程:call GSE api -> operate_proc_v2 with params: \n {query_params}").format( - proc_name=plugin_name, query_params=json.dumps(delegate_proc_base_params, indent=2) - ) - ), - ) - task_id = GseApi.operate_proc({**delegate_proc_base_params, "hosts": hosts_need_delegate_proc})["task_id"] - self.log_info(sub_insts_ids_to_be_delegate_proc, f"GSE TASK ID: [{task_id}]") - - data.outputs.polling_time = 0 - data.outputs.task_id = task_id - data.outputs.proc_name = plugin_name - data.outputs.skipped_host_ids = skipped_host_ids - - def _schedule(self, data, parent_data, callback_data=None): - task_id = data.get_one_of_outputs("task_id") - polling_time = data.get_one_of_outputs("polling_time") - skipped_host_ids: Set[int] = data.get_one_of_outputs("skipped_host_ids") - - # 查询进程操作结果,raw=True,返回接口完整响应数据 - procs_operate_result = GseApi.get_proc_operate_result({"task_id": task_id}, raw=True) - - api_code = procs_operate_result.get("code") - if api_code == GSE_RUNNING_TASK_CODE: - # GSE_RUNNING_TASK_CODE(1000115) 表示查询的任务等待执行中,还未入到 redis(需继续轮询进行查询) - data.outputs.polling_time = polling_time + POLLING_INTERVAL - return True - - is_finished = True - common_data = self.get_common_data(data) - proc_name = data.get_one_of_outputs("proc_name") - for sub_inst in common_data.subscription_instances: - host_obj = common_data.host_id_obj_map[sub_inst.instance_info["host"]["bk_host_id"]] - gse_proc_key = f"{host_obj.bk_cloud_id}:{host_obj.inner_ip}:{constants.GSE_NAMESPACE}:{proc_name}" - - if host_obj.bk_host_id in skipped_host_ids: - continue - - proc_operate_result = procs_operate_result["data"].get(gse_proc_key) - if not proc_operate_result: - self.move_insts_to_failed( - [sub_inst.id], - log_content=_("未能查询到进程操作结果, task_id -> {task_id}, gse_proc_key -> {key}").format( - task_id=task_id, key=gse_proc_key - ), - ) - continue - - error_msg = proc_operate_result["error_msg"] - error_code = proc_operate_result["error_code"] - if error_code == GseDataErrCode.RUNNING: - # 只要有运行中的任务,则认为未完成,标记 is_finished - is_finished = False - if polling_time + POLLING_INTERVAL > POLLING_TIMEOUT: - self.move_insts_to_failed([sub_inst.id], _("GSE任务轮询超时")) - elif error_code != GseDataErrCode.SUCCESS: - # 其它状态码非 SUCCESS 的任务则认为是失败的 - self.move_insts_to_failed( - [sub_inst.id], - log_content=_("调用 GSE 接口异常:错误码 -> {error_code}「{error_code_alias}」, 错误信息 -> {error_msg}").format( - error_code_alias=GseDataErrCode.ERROR_CODE__ALIAS_MAP.get(error_code, error_code), - error_msg=error_msg, - error_code=error_code, - ), - ) - - if is_finished: - self.finish_schedule() - return True - data.outputs.polling_time = polling_time + POLLING_INTERVAL diff --git a/apps/backend/components/collections/agent_new/get_agent_status.py b/apps/backend/components/collections/agent_new/get_agent_status.py index e54e2f6b2..ed9694c68 100644 --- a/apps/backend/components/collections/agent_new/get_agent_status.py +++ b/apps/backend/components/collections/agent_new/get_agent_status.py @@ -13,9 +13,9 @@ from django.utils.translation import ugettext_lazy as _ +from apps.adapters.api.gse import GseApiHelper from apps.backend.api.constants import POLLING_INTERVAL, POLLING_TIMEOUT from apps.node_man import constants, models -from common.api import GseApi from pipeline.core.flow import Service, StaticIntervalGenerator from .base import AgentBaseService, AgentCommonData @@ -70,13 +70,11 @@ def _schedule(self, data, parent_data, callback_data=None): hosts: List[Dict[str, Union[int, str]]] = [] for host_id in host_ids_need_to_query: host_obj = common_data.host_id_obj_map[host_id] - hosts.append({"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id}) + hosts.append( + {"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id, "bk_agent_id": host_obj.bk_agent_id} + ) - # 请求 gse - host_key__agent_info_map: Dict[str, Dict[str, Union[int, str]]] = GseApi.get_agent_info({"hosts": hosts}) - host_key__agent_status_info_map: Dict[str, Dict[str, Union[int, str]]] = GseApi.get_agent_status( - {"hosts": hosts} - ) + agent_id__agent_state_info_map: Dict[str, Dict] = GseApiHelper.list_agent_state(hosts) # 分隔符,用于构造 bk_cloud_id - ip,status - version 等键 sep = ":" @@ -92,20 +90,18 @@ def _schedule(self, data, parent_data, callback_data=None): } for host_id in host_ids_need_to_query: host_obj = common_data.host_id_obj_map[host_id] - host_key = f"{host_obj.bk_cloud_id}{sep}{host_obj.inner_ip}" - # 根据 host_key,取得 agent 版本及状态信息 - agent_info = host_key__agent_info_map.get(host_key, {"version": ""}) - # 默认值为 None 的背景:expect_status 是可传入的,不能设定一个明确状态作为默认值,不然可能与 expect_status 一致 + agent_id = GseApiHelper.get_agent_id(host_obj) + # bk_agent_alive 默认值为 None 的背景:expect_status 是可传入的,不能设定一个明确状态作为默认值,不然可能与 expect_status 一致 # 误判为当前 Agent 状态已符合预期 - agent_status_info = host_key__agent_status_info_map.get(host_key, {"bk_agent_alive": None}) + agent_state_info = agent_id__agent_state_info_map.get(agent_id, {"version": "", "bk_agent_alive": None}) # 获取 agent 版本号及状态 - agent_status = constants.PROC_STATUS_DICT.get(agent_status_info["bk_agent_alive"], None) - agent_version = agent_info["version"] + agent_status = constants.PROC_STATUS_DICT.get(agent_state_info["bk_agent_alive"], None) + agent_version = agent_state_info["version"] self.log_info( host_id__sub_inst_id_map[host_id], log_content=_("查询 GSE 得到主机 [{host_key}] Agent 状态 -> {status_alias}「{status}」, 版本 -> {version}").format( - host_key=host_key, + host_key=agent_id, status_alias=constants.PROC_STATUS_CHN.get(agent_status, agent_status), status=agent_status, version=agent_version or _("无"), diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py index c814910e7..82e8c755d 100644 --- a/apps/backend/components/collections/plugin.py +++ b/apps/backend/components/collections/plugin.py @@ -23,6 +23,7 @@ from django.utils import timezone from django.utils.translation import ugettext as _ +from apps.adapters.api.gse import GseApiHelper from apps.backend.api.constants import ( GSE_RUNNING_TASK_CODE, POLLING_INTERVAL, @@ -45,7 +46,7 @@ from apps.utils import md5 from apps.utils.batch_request import request_multi_thread from apps.utils.files import PathHandler -from common.api import GseApi, JobApi +from common.api import JobApi from pipeline.component_framework.component import Component from pipeline.core.flow import Service, StaticIntervalGenerator @@ -1112,7 +1113,7 @@ def request_gse_or_finish_schedule(self, proc_operate_req: List, data): """批量请求GSE接口""" # 当请求参数为空时,代表无需请求,直接 finish_schedule 跳过即可 if proc_operate_req: - task_id = GseApi.operate_proc_multi({"proc_operate_req": proc_operate_req})["task_id"] + task_id = GseApiHelper.operate_proc_multi(proc_operate_req=proc_operate_req) self.log_info(log_content=f"GSE TASK ID: [{task_id}]") data.outputs.task_id = task_id else: @@ -1149,6 +1150,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): "process_status_id": process_status.id, "subscription_instance_id": subscription_instance.id, }, + "hosts": [{"ip": host.inner_ip, "bk_agent_id": host.bk_agent_id, "bk_cloud_id": host.bk_cloud_id}], "spec": { "identity": { "index_key": "", @@ -1167,10 +1169,6 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): }, }, } - if settings.GSE_VERSION == "V1": - gse_op_params["hosts"] = [{"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id}] - else: - gse_op_params["agent_id_list"] = [host.bk_agent_id] self.log_info(subscription_instance.id, json.dumps(gse_op_params, indent=2)) proc_operate_req.append(gse_op_params) @@ -1229,7 +1227,7 @@ def _schedule(self, data, parent_data, callback_data=None): plugin = policy_step_adapter.plugin_desc group_id_instance_map = common_data.group_id_instance_map - result = GseApi.get_proc_operate_result({"task_id": task_id}, raw=True) + result = GseApiHelper.get_proc_operate_result(task_id) api_code = result.get("code") if api_code == GSE_RUNNING_TASK_CODE: # GSE_RUNNING_TASK_CODE(1000115) 表示查询的任务等待执行中,还未入到 redis(需继续轮询进行查询) @@ -1243,10 +1241,7 @@ def _schedule(self, data, parent_data, callback_data=None): subscription_instance = group_id_instance_map.get(process_status.group_id) proc_name = self.get_plugin_meta_name(plugin, process_status) - if host.bk_agent_id: - gse_proc_key = f"{host.bk_agent_id}:{constants.GSE_NAMESPACE}:{proc_name}" - else: - gse_proc_key = f"{host.bk_cloud_id}:{host.inner_ip}:{constants.GSE_NAMESPACE}:{proc_name}" + gse_proc_key = GseApiHelper.get_gse_proc_key(host, constants.GSE_NAMESPACE, proc_name) proc_operate_result = result["data"].get(gse_proc_key) if not proc_operate_result: self.move_insts_to_failed( diff --git a/apps/backend/subscription/steps/plugin.py b/apps/backend/subscription/steps/plugin.py index 9d93267d9..91b82207b 100644 --- a/apps/backend/subscription/steps/plugin.py +++ b/apps/backend/subscription/steps/plugin.py @@ -20,6 +20,7 @@ import six from django.db.models import Q, QuerySet +from apps.adapters.api.gse import GseApiHelper from apps.backend import constants as backend_const from apps.backend.plugin.manager import PluginManager, PluginServiceActivity from apps.backend.subscription import errors, tools @@ -28,7 +29,6 @@ from apps.backend.subscription.steps.base import Action, Step from apps.node_man import constants, models from apps.node_man.exceptions import ApIDNotExistsError -from apps.node_man.periodic_tasks import sync_proc_status_task from apps.utils import concurrent from common.log import logger from pipeline.builder import Data, Var @@ -569,7 +569,7 @@ def handle_check_and_skip_instances( :param instances: :return: """ - host_list: List[Dict[str, Any]] = [] + host_info_list: List[Dict[str, Any]] = [] host_id__instance_id_map: Dict[int, str] = {} for instance_id, instance in instances.items(): bk_host_id = instance["host"].get("bk_host_id") @@ -578,23 +578,25 @@ def handle_check_and_skip_instances( # 忽略未同步主机 continue host_obj = bk_host_id__host_map[bk_host_id] - host_list.append({"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id}) + host_info_list.append( + {"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id, "bk_agent_id": host_obj.bk_agent_id} + ) host_id__instance_id_map[bk_host_id] = instance_id - # TODO 后续该功能覆盖范围广的情况下,可以考虑通过延时任务回写 DB 的进程状态信息,提高时效性 - proc_statues: List[Dict[str, Any]] = sync_proc_status_task.query_proc_status( - proc_name=self.plugin_name, host_list=host_list + agent_id__readable_proc_status_map: Dict[str, Dict[str, Any]] = GseApiHelper.list_proc_state( + namespace=constants.GSE_NAMESPACE, + proc_name=self.plugin_name, + labels={"proc_name": self.plugin_name}, + host_info_list=host_info_list, + extra_meta_data={}, ) - host_key__readable_proc_status_map: Dict[ - str, Dict[str, Any] - ] = sync_proc_status_task.proc_statues2host_key__readable_proc_status_map(proc_statues) - logger.info(f"host_key__readable_proc_status_map -> {host_key__readable_proc_status_map}") + logger.info(f"agent_id__readable_proc_status_map -> {agent_id__readable_proc_status_map}") for bk_host_id, host_obj in bk_host_id__host_map.items(): + agent_id: str = GseApiHelper.get_agent_id(host_obj) instance_id: str = host_id__instance_id_map[bk_host_id] - host_key: str = f"{host_obj.inner_ip}:{host_obj.bk_cloud_id}" - proc_status: Optional[Dict[str, Any]] = host_key__readable_proc_status_map.get(host_key) + proc_status: Optional[Dict[str, Any]] = agent_id__readable_proc_status_map.get(agent_id) if not proc_status: # 查询不到进程状态信息视为插件状态异常 instance_actions[host_id__instance_id_map[bk_host_id]] = install_action @@ -699,7 +701,7 @@ def _push_migrate_reason(_instance_id: str, **_extra_info): bk_host_id__host_map: Dict[int, models.Host] = { host.bk_host_id: host for host in models.Host.objects.filter(bk_host_id__in=bk_host_ids).only( - "bk_host_id", "inner_ip", "bk_cloud_id", "os_type", "cpu_arch" + "bk_host_id", "bk_agent_id", "inner_ip", "bk_cloud_id", "os_type", "cpu_arch" ) } self.handle_check_and_skip_instances( diff --git a/apps/backend/tests/components/collections/agent_new/test_delegate_plugin_proc.py b/apps/backend/tests/components/collections/agent_new/test_delegate_plugin_proc.py deleted file mode 100644 index 1b85317d3..000000000 --- a/apps/backend/tests/components/collections/agent_new/test_delegate_plugin_proc.py +++ /dev/null @@ -1,351 +0,0 @@ -# -*- coding: utf-8 -*- -""" -TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. -Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. -Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. -You may obtain a copy of the License at https://opensource.org/licenses/MIT -Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -specific language governing permissions and limitations under the License. -""" -from collections import ChainMap -from typing import Any, Dict, List, Optional - -import mock - -from apps.backend.api.constants import POLLING_INTERVAL, GseDataErrCode -from apps.backend.components.collections.agent_new import components -from apps.mock_data import api_mkd, common_unit -from apps.mock_data import utils as mock_data_utils -from apps.node_man import constants, models -from apps.utils import basic -from pipeline.component_framework.test import ( - ComponentTestCase, - ExecuteAssertion, - ScheduleAssertion, -) - -from . import utils - - -class DelegatePluginProcTestCase(utils.AgentServiceBaseTestCase): - - GSE_API_MOCK_PATHS: List[str] = ["apps.backend.components.collections.agent_new.delegate_plugin_proc.GseApi"] - - proc_name: Optional[str] = None - operate_proc_result: Optional[Dict] = None - update_proc_info_result: Optional[Dict] = None - get_proc_operate_result: Optional[Dict] = None - gse_api_mock_client: Optional[api_mkd.gse.utils.GseApiMockClient] = None - - @classmethod - def get_gse_proc_key(cls, host_obj: models.Host, add_supplier_id: bool = False) -> str: - suffix = f"{host_obj.inner_ip}:{constants.GSE_NAMESPACE}:{cls.proc_name}" - - if add_supplier_id: - return f"{host_obj.bk_cloud_id}:{constants.DEFAULT_SUPPLIER_ID}:{suffix}" - else: - return f"{host_obj.bk_cloud_id}:{suffix}" - - @classmethod - def structure_gse_mock_data(cls): - """ - 构造GSE接口返回数据 - :return: - """ - - cls.update_proc_info_result = { - cls.get_gse_proc_key(host_obj, add_supplier_id=True): { - "content": "", - "error_code": GseDataErrCode.SUCCESS, - "error_msg": "success", - } - for host_obj in cls.obj_factory.host_objs - } - cls.get_proc_operate_result = { - "result": True, - "code": GseDataErrCode.SUCCESS, - "message": "", - "data": { - cls.get_gse_proc_key(host_obj, add_supplier_id=False): { - "content": "", - "error_code": GseDataErrCode.SUCCESS, - "error_msg": "success", - } - for host_obj in cls.obj_factory.host_objs - }, - } - cls.operate_proc_result = api_mkd.gse.unit.OP_RESULT - - @classmethod - def adjust_test_data_in_db(cls): - - cls.proc_name = common_unit.plugin.PLUGIN_NAME - os_types = {host_obj.os_type for host_obj in cls.obj_factory.host_objs} - for os_type in os_types: - package_data = basic.remove_keys_from_dict(origin_data=common_unit.plugin.PACKAGES_MODEL_DATA, keys=["id"]) - package_data.update({"cpu_arch": constants.CpuType.x86_64, "os": os_type.lower()}) - - package_obj = models.Packages(**package_data) - package_obj.save() - - proc_control_data = basic.remove_keys_from_dict( - origin_data=common_unit.plugin.PROC_CONTROL_MODEL_DATA, keys=["id", "plugin_package_id"] - ) - proc_control_data.update({"plugin_package_id": package_obj.id, "os": package_obj.os}) - proc_control_obj = models.ProcControl(**proc_control_data) - proc_control_obj.save() - - @classmethod - def setUpTestData(cls): - super().setUpTestData() - # 初始化DB数据后再修改 - cls.adjust_test_data_in_db() - cls.structure_gse_mock_data() - - @classmethod - def get_default_case_name(cls) -> str: - return "托管插件进程成功" - - def fetch_succeeded_sub_inst_ids(self) -> List[int]: - return self.common_inputs["subscription_instance_ids"] - - def component_cls(self): - return components.DelegatePluginProcComponent - - def init_mock_clients(self): - self.gse_api_mock_client = api_mkd.gse.utils.GseApiMockClient( - update_proc_info_return=mock_data_utils.MockReturn( - return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, return_obj=self.update_proc_info_result - ), - get_proc_operate_result_return=mock_data_utils.MockReturn( - return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, return_obj=self.get_proc_operate_result - ), - operate_proc_return=mock_data_utils.MockReturn( - return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, return_obj=self.operate_proc_result - ), - ) - - def structure_common_inputs(self) -> Dict[str, Any]: - common_inputs = super().structure_common_inputs() - return {**common_inputs, "plugin_name": common_unit.plugin.PLUGIN_NAME} - - def structure_common_outputs(self) -> Dict[str, Any]: - return { - "polling_time": 0, - "proc_name": self.proc_name, - "task_id": self.operate_proc_result["task_id"], - "skipped_host_ids": set(), - "succeeded_subscription_instance_ids": self.fetch_succeeded_sub_inst_ids(), - } - - def setUp(self) -> None: - self.init_mock_clients() - for gse_api_mock_path in self.GSE_API_MOCK_PATHS: - mock.patch(gse_api_mock_path, self.gse_api_mock_client).start() - super().setUp() - - def cases(self): - return [ - ComponentTestCase( - name=self.get_default_case_name(), - inputs=self.common_inputs, - parent_data={}, - execute_assertion=ExecuteAssertion( - success=bool(self.fetch_succeeded_sub_inst_ids()), outputs=self.structure_common_outputs() - ), - schedule_assertion=[ - ScheduleAssertion(success=True, schedule_finished=True, outputs=self.structure_common_outputs()), - ], - execute_call_assertion=None, - ) - ] - - -class NotSupportOsTestCase(DelegatePluginProcTestCase): - - REMOVE_PACKAGES = True - - @classmethod - def adjust_test_data_in_db(cls): - super().adjust_test_data_in_db() - if cls.REMOVE_PACKAGES: - models.Packages.objects.filter(project=common_unit.plugin.PLUGIN_NAME).delete() - - @classmethod - def get_default_case_name(cls) -> str: - return "操作系统不支持" - - def cases(self): - return [ - ComponentTestCase( - name=self.get_default_case_name(), - inputs=self.common_inputs, - parent_data={}, - execute_assertion=ExecuteAssertion( - success=bool(self.fetch_succeeded_sub_inst_ids()), - # 操作系统不支持时,task_id 等数据还未写入outputs - outputs={"succeeded_subscription_instance_ids": self.fetch_succeeded_sub_inst_ids()}, - ), - schedule_assertion=[], - execute_call_assertion=None, - ) - ] - - -class UpdateResultNotFoundTestCase(NotSupportOsTestCase): - - REMOVE_PACKAGES = False - - @classmethod - def get_default_case_name(cls) -> str: - return "未能查询到进程更新结果" - - def fetch_succeeded_sub_inst_ids(self) -> List[int]: - return [] - - @classmethod - def structure_gse_mock_data(cls): - """ - 构造GSE接口返回数据 - :return: - """ - super().structure_gse_mock_data() - # 更新插件进程信息接口置空 - cls.update_proc_info_result = {} - - def cases(self): - return [ - ComponentTestCase( - name=self.get_default_case_name(), - inputs=self.common_inputs, - parent_data={}, - execute_assertion=ExecuteAssertion( - success=bool(self.fetch_succeeded_sub_inst_ids()), - outputs={"succeeded_subscription_instance_ids": self.fetch_succeeded_sub_inst_ids()}, - ), - schedule_assertion=None, - execute_call_assertion=None, - ) - ] - - -class OpProcResultNotFoundTestCase(DelegatePluginProcTestCase): - @classmethod - def get_default_case_name(cls) -> str: - return "未能查询到进程操作结果" - - @classmethod - def structure_gse_mock_data(cls): - """ - 构造GSE接口返回数据 - :return: - """ - super().structure_gse_mock_data() - # 更新插件进程信息接口置空 - cls.get_proc_operate_result["data"] = {} - - def cases(self): - return [ - ComponentTestCase( - name=self.get_default_case_name(), - inputs=self.common_inputs, - parent_data={}, - execute_assertion=ExecuteAssertion( - success=bool(self.fetch_succeeded_sub_inst_ids()), outputs=self.structure_common_outputs() - ), - schedule_assertion=[ - ScheduleAssertion( - success=False, - schedule_finished=True, - outputs=dict( - ChainMap({"succeeded_subscription_instance_ids": []}, self.structure_common_outputs()) - ), - ), - ], - execute_call_assertion=None, - ) - ] - - -class OpProcResultErrorTestCase(OpProcResultNotFoundTestCase): - @classmethod - def get_default_case_name(cls) -> str: - return "GSE进程操作错误" - - @classmethod - def structure_gse_mock_data(cls): - """ - 构造GSE接口返回数据 - :return: - """ - super().structure_gse_mock_data() - # 更新插件进程信息接口置空 - cls.get_proc_operate_result["data"] = { - cls.get_gse_proc_key(host_obj, add_supplier_id=False): { - "content": "", - "error_code": GseDataErrCode.AGENT_ABNORMAL, - "error_msg": f"can not find connection by ip {host_obj.inner_ip}", - } - for host_obj in cls.obj_factory.host_objs - } - - -class TimeOutTestCase(DelegatePluginProcTestCase): - @classmethod - def get_default_case_name(cls) -> str: - return "GSE操作结果查询超时" - - @classmethod - def structure_gse_mock_data(cls): - """ - 构造GSE接口返回数据 - :return: - """ - super().structure_gse_mock_data() - # 更新插件进程信息接口置空 - cls.get_proc_operate_result["data"] = { - cls.get_gse_proc_key(host_obj, add_supplier_id=False): { - "content": "", - "error_code": GseDataErrCode.RUNNING, - "error_msg": "running", - } - for host_obj in cls.obj_factory.host_objs - } - - def setUp(self) -> None: - super().setUp() - mock.patch( - "apps.backend.components.collections.agent_new.delegate_plugin_proc.POLLING_TIMEOUT", - 2 * POLLING_INTERVAL - 1, - ).start() - - def cases(self): - return [ - ComponentTestCase( - name=self.get_default_case_name(), - inputs=self.common_inputs, - parent_data={}, - execute_assertion=ExecuteAssertion( - success=bool(self.fetch_succeeded_sub_inst_ids()), outputs=self.structure_common_outputs() - ), - schedule_assertion=[ - ScheduleAssertion( - success=True, - schedule_finished=False, - outputs=dict(ChainMap({"polling_time": POLLING_INTERVAL}, self.structure_common_outputs())), - ), - ScheduleAssertion( - success=False, - schedule_finished=False, - outputs=dict( - ChainMap( - {"succeeded_subscription_instance_ids": [], "polling_time": POLLING_INTERVAL * 2}, - self.structure_common_outputs(), - ) - ), - ), - ], - execute_call_assertion=None, - ) - ] diff --git a/apps/backend/tests/components/collections/agent_new/test_get_agent_status.py b/apps/backend/tests/components/collections/agent_new/test_get_agent_status.py index 645562b7f..33291f9d1 100644 --- a/apps/backend/tests/components/collections/agent_new/test_get_agent_status.py +++ b/apps/backend/tests/components/collections/agent_new/test_get_agent_status.py @@ -13,7 +13,9 @@ from typing import Any, Callable, Dict, List, Optional import mock +from django.conf import settings +from apps.adapters.api import gse from apps.backend.api.constants import POLLING_INTERVAL from apps.backend.components.collections.agent_new import components from apps.mock_data import api_mkd, common_unit @@ -30,7 +32,7 @@ class GetAgentStatusTestCaseMixin: EXPECT_STATUS = constants.ProcStateType.RUNNING - GSE_API_MOCK_PATHS: List[str] = ["apps.backend.components.collections.agent_new.get_agent_status.GseApi"] + GSE_API_MOCK_PATHS: List[str] = ["apps.backend.components.collections.agent_new.get_agent_status.GseApiHelper"] host_ids_need_to_next_query: List[int] = None get_agent_info_func: Callable[[Dict], Dict] = None @@ -87,7 +89,12 @@ def component_cls(self): def setUp(self) -> None: self.init_mock_clients() for gse_api_mock_path in self.GSE_API_MOCK_PATHS: - mock.patch(gse_api_mock_path, self.gse_api_mock_client).start() + mock.patch( + gse_api_mock_path, + gse.get_gse_api_helper(settings.GSE_VERSION)( + version=settings.GSE_VERSION, gse_api_obj=self.gse_api_mock_client + ), + ).start() super().setUp() diff --git a/apps/backend/tests/components/collections/plugin/test_gse_operate_proc.py b/apps/backend/tests/components/collections/plugin/test_gse_operate_proc.py index d95344643..2153fc624 100644 --- a/apps/backend/tests/components/collections/plugin/test_gse_operate_proc.py +++ b/apps/backend/tests/components/collections/plugin/test_gse_operate_proc.py @@ -10,8 +10,10 @@ """ from unittest.mock import patch +from django.conf import settings from django.test import TestCase +from apps.adapters.api import gse from apps.backend.components.collections.plugin import ( GseOperateProcComponent, GseOperateProcService, @@ -45,7 +47,10 @@ def setUp(self): self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client) self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient) self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client) - self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient) + self.plugin_gseapi = patch( + utils.PLUGIN_GSEAPI, + gse.get_gse_api_helper(settings.GSE_VERSION)(settings.GSE_VERSION, utils.GseMockClient()), + ) self.plugin_gseapi.start() self.cmdb_client.start() diff --git a/apps/backend/tests/components/collections/plugin/test_job_allocate_port.py b/apps/backend/tests/components/collections/plugin/test_job_allocate_port.py index 81505aeeb..5b3d4549b 100644 --- a/apps/backend/tests/components/collections/plugin/test_job_allocate_port.py +++ b/apps/backend/tests/components/collections/plugin/test_job_allocate_port.py @@ -40,7 +40,6 @@ def setUp(self): self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client) self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient) self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client) - self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient) self.cmdb_client.start() self.plugin_client.start() diff --git a/apps/backend/tests/components/collections/plugin/test_remove_config.py b/apps/backend/tests/components/collections/plugin/test_remove_config.py index 5a7b0993f..013850eca 100644 --- a/apps/backend/tests/components/collections/plugin/test_remove_config.py +++ b/apps/backend/tests/components/collections/plugin/test_remove_config.py @@ -40,7 +40,6 @@ def setUp(self): self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client) self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient) self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client) - self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient) self.cmdb_client.start() self.plugin_client.start() diff --git a/apps/backend/tests/components/collections/plugin/test_render_and_push_config.py b/apps/backend/tests/components/collections/plugin/test_render_and_push_config.py index a87f1de4f..ef4c9465e 100644 --- a/apps/backend/tests/components/collections/plugin/test_render_and_push_config.py +++ b/apps/backend/tests/components/collections/plugin/test_render_and_push_config.py @@ -40,7 +40,6 @@ def setUp(self): self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client) self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient) self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client) - self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient) self.cmdb_client.start() self.plugin_client.start() diff --git a/apps/backend/tests/components/collections/plugin/test_transfer_package.py b/apps/backend/tests/components/collections/plugin/test_transfer_package.py index 774719135..434a37644 100644 --- a/apps/backend/tests/components/collections/plugin/test_transfer_package.py +++ b/apps/backend/tests/components/collections/plugin/test_transfer_package.py @@ -37,7 +37,6 @@ def setUp(self): patch(utils.CMDB_CLIENT_MOCK_PATH, utils.CmdbClient).start() patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client).start() patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client).start() - patch(utils.PLUGIN_GSEAPI, utils.GseMockClient).start() patch(utils.JOB_JOBAPI, utils.JobMockClient).start() patch(utils.PLUGIN_CLIENT_MOCK_PATH, utils.JobMockClient).start() diff --git a/apps/backend/tests/components/collections/plugin/test_uninstall_package.py b/apps/backend/tests/components/collections/plugin/test_uninstall_package.py index 48ecfbb1a..99f58f9f1 100644 --- a/apps/backend/tests/components/collections/plugin/test_uninstall_package.py +++ b/apps/backend/tests/components/collections/plugin/test_uninstall_package.py @@ -40,7 +40,6 @@ def setUp(self): self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client) self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient) self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client) - self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient) self.cmdb_client.start() self.plugin_client.start() diff --git a/apps/backend/tests/components/collections/plugin/utils.py b/apps/backend/tests/components/collections/plugin/utils.py index d925c3377..be1843290 100644 --- a/apps/backend/tests/components/collections/plugin/utils.py +++ b/apps/backend/tests/components/collections/plugin/utils.py @@ -80,7 +80,7 @@ JOB_MULTI_THREAD_PATH = "apps.backend.components.collections.job.request_multi_thread" -PLUGIN_GSEAPI = "apps.backend.components.collections.plugin.GseApi" +PLUGIN_GSEAPI = "apps.backend.components.collections.plugin.GseApiHelper" # 目标主机信息 INSTANCE_INFO = { "key": "", diff --git a/apps/mock_data/api_mkd/gse/unit.py b/apps/mock_data/api_mkd/gse/unit.py index e7f434f50..813d9f519 100644 --- a/apps/mock_data/api_mkd/gse/unit.py +++ b/apps/mock_data/api_mkd/gse/unit.py @@ -56,25 +56,23 @@ } } -GET_AGENT_STATE_LIST_DATA = [ - { - "bk_agent_id": f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}", - "bk_cloud_id": constants.DEFAULT_CLOUD, - "version": GSE_PROCESS_VERSION, - "run_mode": 0, - "status_code": constants.GseAgentStatusCode.RUNNING.value, - } -] +GSE_V2_AGENT_STATE_DATA = { + "bk_agent_id": f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}", + "bk_cloud_id": constants.DEFAULT_CLOUD, + "version": GSE_PROCESS_VERSION, + "run_mode": 0, + "status_code": constants.GseAgentStatusCode.RUNNING.value, +} -GET_AGENT_NOT_ALIVE_STATE_LIST_DATA = [ - { - "bk_agent_id": f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}", - "bk_cloud_id": constants.DEFAULT_CLOUD, - "version": GSE_PROCESS_VERSION, - "run_mode": 0, - "status_code": constants.GseAgentStatusCode.STOPPED.value, - } -] +GSE_V2_AGENT_NOT_ALIVE_STATE_DATA = { + **GSE_V2_AGENT_STATE_DATA, + "status_code": constants.GseAgentStatusCode.STOPPED.value, +} + + +GET_V2_AGENT_STATE_DATA_LIST = [GSE_V2_AGENT_STATE_DATA] + +GET_V2_AGENT_NOT_ALIVE_STATE_LIST = [GSE_V2_AGENT_NOT_ALIVE_STATE_DATA] GET_AGENT_INFO_LIST_DATA = [ { @@ -124,24 +122,26 @@ def mock_get_agent_info_list(params): return agent_info_list -def mock_get_agent_state_list(params): - agent_state_list = copy.deepcopy(GET_AGENT_STATE_LIST_DATA) - for index, agent_state in enumerate(agent_state_list): - agent_state["bk_agent_id"] = params["agent_id_list"][index] +def mock_v2_cluster_list_agent_state_return(params): + agent_state_list = [] + for agent_id in params["agent_id_list"]: + agent_state_list.append({**GSE_V2_AGENT_STATE_DATA, "bk_agent_id": agent_id}) return agent_state_list def mock_get_proc_status(params): pro_status_data = copy.deepcopy(GET_PROC_STATUS_DATA) - hosts_param = params["hosts"][0] - for index, host_info in enumerate(params["hosts"]): - if f"{hosts_param['bk_cloud_id']}:{hosts_param['ip']}" == hosts_param["agent_id"]: - pro_status_data["proc_infos"][index]["host"] = { - "ip": hosts_param["ip"], - "bk_cloud_id": hosts_param["bk_cloud_id"], - } - else: - pro_status_data["proc_infos"][index]["bk_agent_id"] = hosts_param["agent_id"] + if "hosts" in params: + hosts = params["hosts"] + else: + hosts = [] + for agent_id in params["agent_id_list"]: + bk_cloud_id, ip = agent_id.split(":") + hosts.append({"bk_cloud_id": bk_cloud_id, "ip": ip}) + + for index, host_info in enumerate(hosts): + pro_status_data["proc_infos"][index]["bk_agent_id"] = f"{host_info['bk_cloud_id']}:{host_info['ip']}" + pro_status_data["proc_infos"][index]["host"] = {"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} # 添加返回一个不存在的key来模拟额外返回的case not_exist_proc_info = copy.deepcopy(pro_status_data["proc_infos"][0]) not_exist_proc_info["bk_agent_id"] = "not_exist_proc_info_agent_id" diff --git a/apps/mock_data/api_mkd/gse/utils.py b/apps/mock_data/api_mkd/gse/utils.py index 2f3b4ae89..49141e4ad 100644 --- a/apps/mock_data/api_mkd/gse/utils.py +++ b/apps/mock_data/api_mkd/gse/utils.py @@ -29,14 +29,11 @@ class GseApiMockClient(utils.BaseMockClient): GET_AGENT_NOT_ALIVE_STATUS_RETURN = utils.MockReturn( return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_AGENT_NOT_ALIVE_STATUS_DATA ) - DEFAULT_GET_AGENT_INFO_LIST_RETURN = utils.MockReturn( - return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_get_agent_info_list - ) - DEFAULT_GET_AGENT_STATE_LIST_RETURN = utils.MockReturn( - return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_get_agent_state_list + DEFAULT_V2_CLUSTER_LIST_AGENT_STATE_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_v2_cluster_list_agent_state_return ) GET_AGENT_NOT_ALIVE_STATE_LIST_RETURN = utils.MockReturn( - return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_AGENT_NOT_ALIVE_STATE_LIST_DATA + return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_V2_AGENT_NOT_ALIVE_STATE_LIST ) DEFAULT_GET_PROC_STATUS_RETURN = utils.MockReturn( return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_get_proc_status @@ -52,8 +49,7 @@ def __init__( update_proc_info_return=None, get_agent_info_return=DEFAULT_GET_AGENT_INFO_RETURN, get_agent_status_return=DEFAULT_GET_AGENT_STATUS_RETURN, - get_agent_info_list_return=DEFAULT_GET_AGENT_INFO_LIST_RETURN, - get_agent_state_list_return=DEFAULT_GET_AGENT_STATE_LIST_RETURN, + v2_cluster_list_agent_state_return=DEFAULT_V2_CLUSTER_LIST_AGENT_STATE_RETURN, ): super().__init__() self.operate_proc = self.generate_magic_mock(mock_return_obj=operate_proc_return) @@ -64,5 +60,4 @@ def __init__( self.update_proc_info = self.generate_magic_mock(mock_return_obj=update_proc_info_return) self.get_agent_info = self.generate_magic_mock(mock_return_obj=get_agent_info_return) self.get_agent_status = self.generate_magic_mock(mock_return_obj=get_agent_status_return) - self.get_agent_info_list = self.generate_magic_mock(mock_return_obj=get_agent_info_list_return) - self.get_agent_state_list = self.generate_magic_mock(mock_return_obj=get_agent_state_list_return) + self.v2_cluster_list_agent_state = self.generate_magic_mock(mock_return_obj=v2_cluster_list_agent_state_return) diff --git a/apps/node_man/periodic_tasks/sync_agent_status_task.py b/apps/node_man/periodic_tasks/sync_agent_status_task.py index 1aa2645ed..378bf0553 100644 --- a/apps/node_man/periodic_tasks/sync_agent_status_task.py +++ b/apps/node_man/periodic_tasks/sync_agent_status_task.py @@ -8,25 +8,20 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from typing import Dict + from celery.task import periodic_task, task -from django.conf import settings -from django.db.models import Q +from apps.adapters.api.gse import GseApiHelper from apps.node_man import constants from apps.node_man.models import Host, ProcessStatus from apps.utils.periodic_task import calculate_countdown -from common.api import GseApi from common.log import logger @task(queue="default", ignore_result=True) -def update_or_create_host_agent_status(task_id, start, end, has_agent_id=False): - # GSE接口不支持同时传 {cloud_id}:{ip} + {agent_id} 混合模式,因此需要分开处理 - if has_agent_id: - host_queryset = Host.objects.exclude(bk_agent_id__isnull=True).exclude(bk_agent_id__exact="") - else: - host_queryset = Host.objects.filter(Q(bk_agent_id__isnull=True) | Q(bk_agent_id__exact="")) - hosts = host_queryset.values("bk_host_id", "bk_agent_id", "bk_cloud_id", "inner_ip", "node_from")[start:end] +def update_or_create_host_agent_status(task_id, start, end): + hosts = Host.objects.values("bk_host_id", "bk_agent_id", "bk_cloud_id", "inner_ip", "node_from")[start:end] if not hosts: # 结束递归 return @@ -38,35 +33,16 @@ def update_or_create_host_agent_status(task_id, start, end, has_agent_id=False): node_from_map = {} # 生成查询参数host弄表 - agent_id_list = [] - query_host = [] + query_hosts = [] for host in hosts: - if host["bk_agent_id"] and settings.GSE_VERSION != "V1": - agent_id = host["bk_agent_id"] - else: - agent_id = f"{host['bk_cloud_id']}:{host['inner_ip']}" + agent_id = GseApiHelper.get_agent_id(host) bk_host_id_map[agent_id] = host["bk_host_id"] node_from_map[agent_id] = host["node_from"] - query_host.append({"ip": host["inner_ip"], "bk_cloud_id": host["bk_cloud_id"]}) - agent_id_list.append(agent_id) - - # 查询agent状态和版本信息 - if settings.GSE_VERSION == "V1": - agent_status_data = GseApi.get_agent_status({"hosts": query_host}) - agent_info_data = GseApi.get_agent_info({"hosts": query_host}) - else: - # GSE2.0 接口,补充GSE1.0字段 - agent_status_list = GseApi.get_agent_state_list({"agent_id_list": agent_id_list}) - agent_status_data = {} - for agent_status in agent_status_list: - if agent_status.get("status_code") == constants.GseAgentStatusCode.RUNNING.value: - agent_status["bk_agent_alive"] = constants.BkAgentStatus.ALIVE.value - else: - agent_status["bk_agent_alive"] = constants.BkAgentStatus.NOT_ALIVE.value - agent_status_data[agent_status["bk_agent_id"]] = agent_status + query_hosts.append( + {"ip": host["inner_ip"], "bk_cloud_id": host["bk_cloud_id"], "bk_agent_id": host["bk_agent_id"]} + ) - agent_info_list = GseApi.get_agent_info_list({"agent_id_list": agent_id_list}) - agent_info_data = {agent_info["bk_agent_id"]: agent_info for agent_info in agent_info_list} + agent_id__agent_state_info_map: Dict[str, Dict] = GseApiHelper.list_agent_state(query_hosts) # 查询需要更新主机的ProcessStatus对象 process_status_objs = ProcessStatus.objects.filter( @@ -84,20 +60,20 @@ def update_or_create_host_agent_status(task_id, start, end, has_agent_id=False): process_objs = [] need_update_node_from_host = [] to_be_created_status = [] - for key, host_info in agent_status_data.items(): - process_status_id = process_status_id_map.get(bk_host_id_map[key], {}).get("id") - is_running = host_info["bk_agent_alive"] == constants.BkAgentStatus.ALIVE.value - version = constants.VERSION_PATTERN.search(agent_info_data[key]["version"]) + for agent_id, agent_state_info in agent_id__agent_state_info_map.items(): + process_status_id = process_status_id_map.get(bk_host_id_map[agent_id], {}).get("id") + is_running = agent_state_info["bk_agent_alive"] == constants.BkAgentStatus.ALIVE.value + version = agent_state_info["version"] if is_running: status = constants.ProcStateType.RUNNING - if node_from_map[key] == constants.NodeFrom.CMDB: + if node_from_map[agent_id] == constants.NodeFrom.CMDB: need_update_node_from_host.append( - Host(bk_host_id=bk_host_id_map[key], node_from=constants.NodeFrom.NODE_MAN) + Host(bk_host_id=bk_host_id_map[agent_id], node_from=constants.NodeFrom.NODE_MAN) ) else: # 状态为0时如果节点管理为CMDB标记为未安装否则为异常 - if node_from_map[key] == constants.NodeFrom.CMDB: + if node_from_map[agent_id] == constants.NodeFrom.CMDB: # NOT_INSTALLED status = constants.ProcStateType.NOT_INSTALLED else: @@ -107,17 +83,11 @@ def update_or_create_host_agent_status(task_id, start, end, has_agent_id=False): if not process_status_id: # 如果不存在ProcessStatus对象需要创建 to_be_created_status.append( - ProcessStatus( - bk_host_id=bk_host_id_map[key], - status=status, - version=version.group() if version else "", - ) + ProcessStatus(bk_host_id=bk_host_id_map[agent_id], status=status, version=version) ) else: process_objs.append( - ProcessStatus( - id=process_status_id, status=status, version=version.group() if (version and is_running) else "" - ) + ProcessStatus(id=process_status_id, status=status, version=(version, "")[not is_running]) ) # 批量更新状态&版本 @@ -139,18 +109,14 @@ def sync_agent_status_periodic_task(): """ task_id = sync_agent_status_periodic_task.request.id logger.info(f"{task_id} | sync_agent_status_task: Start syncing host status.") - for (has_agent_id, queryset) in [ - (True, Host.objects.exclude(bk_agent_id__isnull=True).exclude(bk_agent_id__exact="")), - (False, Host.objects.filter(Q(bk_agent_id__isnull=True) | Q(bk_agent_id__exact=""))), - ]: - count = queryset.count() - for start in range(0, count, constants.QUERY_AGENT_STATUS_HOST_LENS): - countdown = calculate_countdown( - count=count / constants.QUERY_AGENT_STATUS_HOST_LENS, - index=start / constants.QUERY_AGENT_STATUS_HOST_LENS, - duration=constants.SYNC_AGENT_STATUS_TASK_INTERVAL, - ) - logger.info(f"{task_id} | sync_agent_status_task after {countdown} seconds") - update_or_create_host_agent_status.apply_async( - (task_id, start, start + constants.QUERY_AGENT_STATUS_HOST_LENS, has_agent_id), countdown=countdown - ) + count = Host.objects.count() + for start in range(0, count, constants.QUERY_AGENT_STATUS_HOST_LENS): + countdown = calculate_countdown( + count=count / constants.QUERY_AGENT_STATUS_HOST_LENS, + index=start / constants.QUERY_AGENT_STATUS_HOST_LENS, + duration=constants.SYNC_AGENT_STATUS_TASK_INTERVAL, + ) + logger.info(f"{task_id} | sync_agent_status_task after {countdown} seconds") + update_or_create_host_agent_status.apply_async( + (task_id, start, start + constants.QUERY_AGENT_STATUS_HOST_LENS), countdown=countdown + ) diff --git a/apps/node_man/periodic_tasks/sync_cmdb_host.py b/apps/node_man/periodic_tasks/sync_cmdb_host.py index dcc41aba3..f77e9ea68 100644 --- a/apps/node_man/periodic_tasks/sync_cmdb_host.py +++ b/apps/node_man/periodic_tasks/sync_cmdb_host.py @@ -32,7 +32,7 @@ def query_biz_hosts(bk_biz_id: int, bk_host_ids: typing.List[int]) -> typing.Lis :return: 主机列表 """ query_params = { - "fields": constants.LIST_BIZ_HOSTS_KWARGS, + "fields": constants.CC_HOST_FIELDS, "host_property_filter": { "condition": "AND", "rules": [{"field": "bk_host_id", "operator": "in", "value": bk_host_ids}], @@ -158,9 +158,9 @@ def update_or_create_host_base(biz_id, task_id, cmdb_host_data): # 查询节点管理已存在的主机 exist_proxy_host_ids: typing.Set[int] = set( - models.Host.objects.filter( - bk_host_id__in=bk_host_ids, node_type=constants.NodeType.PROXY - ).values_list("bk_host_id", flat=True) + models.Host.objects.filter(bk_host_id__in=bk_host_ids, node_type=constants.NodeType.PROXY).values_list( + "bk_host_id", flat=True + ) ) exist_agent_host_ids: typing.Set[int] = set( models.Host.objects.filter(bk_host_id__in=bk_host_ids) diff --git a/apps/node_man/periodic_tasks/sync_proc_status_task.py b/apps/node_man/periodic_tasks/sync_proc_status_task.py index ff8adc585..5b1c42623 100644 --- a/apps/node_man/periodic_tasks/sync_proc_status_task.py +++ b/apps/node_man/periodic_tasks/sync_proc_status_task.py @@ -12,110 +12,60 @@ import typing from celery.task import periodic_task, task -from django.db.models import Q -from apps.core.concurrent import controller +from apps.adapters.api.gse import GseApiHelper from apps.node_man import constants, tools from apps.node_man.models import Host, ProcessStatus -from apps.utils import concurrent from apps.utils.periodic_task import calculate_countdown -from common.api import GseApi from common.log import logger -def get_version(version_str): - version = constants.VERSION_PATTERN.search(version_str) - return version.group() if version else "" - - -@controller.ConcurrentController( - data_list_name="host_list", - batch_call_func=concurrent.batch_call, - get_config_dict_func=lambda: {"limit": constants.QUERY_PROC_STATUS_HOST_LENS}, -) -def query_proc_status(proc_name, host_list): - """ - 上云接口测试 - 200: 0.3s-0.4s (0.41s 0.29s 0.33s) - 500: 0.35s-0.5s (0.34s 0.42s 0.51s) - 1000: 0.5s-0.6s (0.53s 0.56s 0.61s) - 2000: 0.9s (0.91s, 0.93s) - 5000: 2s-4s (2.3s 2.2s 4.1s 4.3s) - """ - kwargs = { - "meta": {"namespace": constants.GSE_NAMESPACE, "name": proc_name, "labels": {"proc_name": proc_name}}, - "hosts": host_list, - "agent_id_list": [host["agent_id"] for host in host_list], - } - data = GseApi.get_proc_status(kwargs) - - return data.get("proc_infos") or [] - - -def proc_statues2host_key__readable_proc_status_map( - proc_statuses: typing.List[typing.Dict[str, typing.Any]] -) -> typing.Dict[str, typing.Dict[str, typing.Any]]: - """ - 将原始的进程状态信息格式化为 主机唯一标识 - 可读的进程状态信息 映射关系 - :param proc_statuses: 进程状态信息 - :return: 主机唯一标识 - 可读的进程状态信息 映射关系 - """ - host_key__readable_proc_status_map: typing.Dict[str, typing.Dict[str, typing.Any]] = {} - for proc_status in proc_statuses: - if "bk_agent_id" in proc_status: - host_key = proc_status["bk_agent_id"] - else: - host_key = f"{proc_status['host']['bk_cloud_id']}:{proc_status['host']['ip']}" - host_key__readable_proc_status_map[host_key] = { - "version": get_version(proc_status["version"]), - "status": constants.PLUGIN_STATUS_DICT[proc_status["status"]], - "is_auto": constants.AutoStateType.AUTO if proc_status["isauto"] else constants.AutoStateType.UNAUTO, - "name": proc_status["meta"]["name"], - } - return host_key__readable_proc_status_map - - @task(queue="default", ignore_result=True) -def update_or_create_proc_status(task_id, hosts, sync_proc_list, start): - host_ip_cloud_list = [] - bk_host_id_map = {} - for host in hosts: - if host.bk_agent_id: - agent_id = host.bk_agent_id - else: - agent_id = f"{host.bk_cloud_id}:{host.inner_ip}" - host_ip_cloud_list.append({"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id, "agent_id": agent_id}) - bk_host_id_map[agent_id] = host.bk_host_id +def update_or_create_proc_status(task_id, host_objs, sync_proc_list, start): + host_info_list = [] + agent_id__host_id_map = {} + for host_obj in host_objs: + host_info = {"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id, "bk_agent_id": host_obj.bk_agent_id} + host_info_list.append(host_info) + agent_id__host_id_map[GseApiHelper.get_agent_id(host_info)] = host_obj.bk_host_id for proc_name in sync_proc_list: past = time.time() - proc_statuses = query_proc_status(proc_name=proc_name, host_list=host_ip_cloud_list) - logger.info(f"this get_proc_status cost {time.time() - past}s") - host_key__readable_proc_status_map: typing.Dict[ + agent_id__readable_proc_status_map: typing.Dict[ str, typing.Dict[str, typing.Any] - ] = proc_statues2host_key__readable_proc_status_map(proc_statuses) + ] = GseApiHelper.list_proc_state( + namespace=constants.GSE_NAMESPACE, + proc_name=proc_name, + labels={"proc_name": proc_name}, + host_info_list=host_info_list, + extra_meta_data={}, + ) + + logger.info(f"this get_proc_status cost {time.time() - past}s") process_status_objs = ProcessStatus.objects.filter( name=proc_name, - bk_host_id__in=bk_host_id_map.values(), + bk_host_id__in=agent_id__host_id_map.values(), source_type=ProcessStatus.SourceType.DEFAULT, proc_type=constants.ProcType.PLUGIN, is_latest=True, ).values("bk_host_id", "id", "name", "status") host_proc_key__proc_status_map = {} - for item in process_status_objs: - host_proc_key__proc_status_map[f"{item['name']}:{item['bk_host_id']}"] = item + for process_status_obj in process_status_objs: + host_proc_key__proc_status_map[ + f"{process_status_obj['name']}:{process_status_obj['bk_host_id']}" + ] = process_status_obj need_update_status = [] need_create_status = [] - for host_key, readable_proc_status in host_key__readable_proc_status_map.items(): - if host_key not in bk_host_id_map: + for agent_id, readable_proc_status in agent_id__readable_proc_status_map.items(): + if agent_id not in agent_id__host_id_map: continue db_proc_status = host_proc_key__proc_status_map.get( - f'{readable_proc_status["name"]}:{bk_host_id_map[host_key]}' + f'{readable_proc_status["name"]}:{agent_id__host_id_map[agent_id]}' ) # 如果DB中进程状态为手动停止,并且同步回来的进程状态为终止,此时保持手动停止的标记,用于订阅的豁免操作 @@ -144,7 +94,7 @@ def update_or_create_proc_status(task_id, hosts, sync_proc_list, start): name=readable_proc_status["name"], source_type=ProcessStatus.SourceType.DEFAULT, proc_type=constants.ProcType.PLUGIN, - bk_host_id=bk_host_id_map[host_key], + bk_host_id=agent_id__host_id_map[agent_id], is_latest=True, ) # 忽略无用的进程信息 @@ -164,26 +114,22 @@ def update_or_create_proc_status(task_id, hosts, sync_proc_list, start): def sync_proc_status_periodic_task(): sync_proc_list = tools.PluginV2Tools.fetch_head_plugins() task_id = sync_proc_status_periodic_task.request.id - host_queryset_list = [ - Host.objects.exclude(bk_agent_id__isnull=True).exclude(bk_agent_id__exact=""), - Host.objects.filter(Q(bk_agent_id__isnull=True) | Q(bk_agent_id__exact="")), - ] - for host_queryset in host_queryset_list: - count = host_queryset.count() - logger.info(f"{task_id} | sync host proc status... host_count={count}.") - - for start in range(0, count, constants.QUERY_PROC_STATUS_HOST_LENS): - countdown = calculate_countdown( - count=count / constants.QUERY_PROC_STATUS_HOST_LENS, - index=start / constants.QUERY_PROC_STATUS_HOST_LENS, - duration=constants.SYNC_PROC_STATUS_TASK_INTERVAL, - ) - logger.info(f"{task_id} | sync host proc status after {countdown} seconds") - - # (task_id, hosts[start: start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start) - update_or_create_proc_status.apply_async( - (task_id, host_queryset[start : start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start), - countdown=countdown, - ) - - logger.info(f"{task_id} | sync host proc status complete.") + host_queryset = Host.objects.all() + count = host_queryset.count() + logger.info(f"{task_id} | sync host proc status... host_count={count}.") + + for start in range(0, count, constants.QUERY_PROC_STATUS_HOST_LENS): + countdown = calculate_countdown( + count=count / constants.QUERY_PROC_STATUS_HOST_LENS, + index=start / constants.QUERY_PROC_STATUS_HOST_LENS, + duration=constants.SYNC_PROC_STATUS_TASK_INTERVAL, + ) + logger.info(f"{task_id} | sync host proc status after {countdown} seconds") + + # (task_id, hosts[start: start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start) + update_or_create_proc_status.apply_async( + (task_id, host_queryset[start : start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start), + countdown=countdown, + ) + + logger.info(f"{task_id} | sync host proc status complete.") diff --git a/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py b/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py index 939b01850..09db7f2c9 100644 --- a/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py +++ b/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py @@ -10,8 +10,10 @@ """ from unittest.mock import patch +from django.conf import settings from django.test import override_settings +from apps.adapters.api import gse from apps.mock_data.api_mkd.gse.unit import GSE_PROCESS_VERSION from apps.mock_data.api_mkd.gse.utils import GseApiMockClient from apps.mock_data.common_unit.host import ( @@ -25,6 +27,7 @@ update_or_create_host_agent_status, ) from apps.utils.unittest.testcase import CustomBaseTestCase +from env.constants import GseVersion class TestSyncAgentStatus(CustomBaseTestCase): @@ -36,7 +39,10 @@ def test_update_or_create_host_agent_status_no_host(self): update_or_create_host_agent_status(None, 0, 1) self.assertEqual(ProcessStatus.objects.count(), 0) - @patch("apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", GseApiMockClient()) + @patch( + "apps.node_man.periodic_tasks.sync_agent_status_task.GseApiHelper", + gse.get_gse_api_helper(settings.GSE_VERSION)(settings.GSE_VERSION, GseApiMockClient()), + ) def test_update_or_create_host_agent_status_alive(self): host = Host.objects.create(**HOST_MODEL_DATA) # 测试创建ProcessStatus对象 @@ -48,8 +54,11 @@ def test_update_or_create_host_agent_status_alive(self): self.assertEqual(host.node_from, constants.NodeFrom.NODE_MAN) @patch( - "apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", - GseApiMockClient(get_agent_status_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATUS_RETURN), + "apps.node_man.periodic_tasks.sync_agent_status_task.GseApiHelper", + gse.get_gse_api_helper(settings.GSE_VERSION)( + settings.GSE_VERSION, + GseApiMockClient(get_agent_status_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATUS_RETURN), + ), ) def test_update_or_create_host_agent_status_not_alive(self): host = Host.objects.create(**HOST_MODEL_DATA) @@ -65,19 +74,25 @@ def test_update_or_create_host_agent_status_not_alive(self): process_status = ProcessStatus.objects.get(bk_host_id=host.bk_host_id) self.assertEqual(process_status.status, constants.ProcStateType.TERMINATED) - @override_settings(GSE_VERSION="V2") - @patch("apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", GseApiMockClient()) + @override_settings(GSE_VERSION=GseVersion.V2.value) + @patch( + "apps.node_man.periodic_tasks.sync_agent_status_task.GseApiHelper", + gse.get_gse_api_helper(GseVersion.V2.value)(GseVersion.V2.value, GseApiMockClient()), + ) def test_update_or_create_host_agent_status_alive_gse_v2(self): host = Host.objects.create(**HOST_MODEL_DATA_WITH_AGENT_ID) # 测试创建ProcessStatus对象 - update_or_create_host_agent_status(None, 0, 1, has_agent_id=True) + update_or_create_host_agent_status(None, 0, 1) process_status = ProcessStatus.objects.get(bk_host_id=host.bk_host_id) self.assertEqual(process_status.status, constants.ProcStateType.RUNNING) - @override_settings(GSE_VERSION="V2") + @override_settings(GSE_VERSION=GseVersion.V2.value) @patch( - "apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", - GseApiMockClient(get_agent_state_list_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATE_LIST_RETURN), + "apps.node_man.periodic_tasks.sync_agent_status_task.GseApiHelper", + gse.get_gse_api_helper(GseVersion.V2.value)( + GseVersion.V2.value, + GseApiMockClient(v2_cluster_list_agent_state_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATE_LIST_RETURN), + ), ) def test_update_or_create_host_agent_status_not_alive_gse_v2(self): host = Host.objects.create(**HOST_MODEL_DATA) diff --git a/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py b/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py index 5fcde4ee4..92f4ac88d 100644 --- a/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py +++ b/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py @@ -12,7 +12,9 @@ import importlib import mock +from django.conf import settings +from apps.adapters.api import gse from apps.mock_data.api_mkd.gse.unit import GSE_PROCESS_NAME from apps.mock_data.api_mkd.gse.utils import GseApiMockClient from apps.mock_data.common_unit.host import ( @@ -27,7 +29,6 @@ class TestSyncProcStatus(CustomBaseTestCase): - @classmethod def setUpClass(cls): mock.patch("apps.utils.concurrent.batch_call", concurrent.batch_call_serial).start() @@ -41,7 +42,10 @@ def test_sync_proc_status_periodic_task(self): Host.objects.create(**HOST_MODEL_DATA) sync_proc_status_task.sync_proc_status_periodic_task() - @mock.patch("apps.node_man.periodic_tasks.sync_proc_status_task.GseApi", GseApiMockClient()) + @mock.patch( + "apps.node_man.periodic_tasks.sync_proc_status_task.GseApiHelper", + gse.get_gse_api_helper(settings.GSE_VERSION)(settings.GSE_VERSION, GseApiMockClient()), + ) def test_update_or_create_proc_status(self, *args, **kwargs): host = Host.objects.create(**HOST_MODEL_DATA) # 测试新建proc_status @@ -62,7 +66,10 @@ def test_update_or_create_proc_status(self, *args, **kwargs): [constants.ProcStateType.MANUAL_STOP, host.bk_host_id, GSE_PROCESS_NAME], ) - @mock.patch("apps.node_man.periodic_tasks.sync_proc_status_task.GseApi", GseApiMockClient()) + @mock.patch( + "apps.node_man.periodic_tasks.sync_proc_status_task.GseApiHelper", + gse.get_gse_api_helper(settings.GSE_VERSION)(settings.GSE_VERSION, GseApiMockClient()), + ) def test_update_or_create_proc_status_with_agent_id(self, *args, **kwargs): host = Host.objects.create(**HOST_MODEL_DATA_WITH_AGENT_ID) sync_proc_status_task.update_or_create_proc_status(None, [host], [GSE_PROCESS_NAME], 0) diff --git a/common/api/modules/gse.py b/common/api/modules/gse.py index 918eba169..ae819e82a 100644 --- a/common/api/modules/gse.py +++ b/common/api/modules/gse.py @@ -67,15 +67,33 @@ def __init__(self): module=self.MODULE, description="获取Agent状态", ) - self.get_agent_info_list = DataAPI( + self.v2_cluster_list_agent_state = DataAPI( method="POST", - url=GSE_APIGATEWAY_ROOT_V2 + "get_agent_info_list/", + url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/cluster/list_agent_state/", module=self.MODULE, - description="获取Agent版本信息", + description="GSE2.0-获取Agent状态", + ) + self.v2_cluster_list_agent_info = DataAPI( + method="POST", + url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/cluster/list_agent_info/", + module=self.MODULE, + description="GSE2.0-查询Agent详情列表信息", + ) + self.v2_proc_get_proc_status_v2 = DataAPI( + method="POST", + url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/proc/get_proc_status_v2/", + module=self.MODULE, + description="GSE2.0-查询进程状态信息", + ) + self.v2_proc_operate_proc_multi = DataAPI( + method="POST", + url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/proc/operate_proc_multi/", + module=self.MODULE, + description="GSE2.0-批量进程操作", ) - self.get_agent_state_list = DataAPI( + self.v2_proc_get_proc_operate_result_v2 = DataAPI( method="POST", - url=GSE_APIGATEWAY_ROOT_V2 + "get_agent_state_list/", + url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/proc/get_proc_operate_result_v2/", module=self.MODULE, - description="GSE2.0 获取Agent状态", + description="GSE2.0-查询进程操作结果", ) diff --git a/config/default.py b/config/default.py index fce67bc43..6b6266c3b 100644 --- a/config/default.py +++ b/config/default.py @@ -136,8 +136,6 @@ def redirect_func(request): BLUEKING_BIZ_ID = 9991001 # 作业平台版本 JOB_VERSION = os.getenv("JOB_VERSION") or "V3" -# 管控平台平台版本 -GSE_VERSION = os.getenv("GSE_VERSION") or "V1" # 资源池业务ID BK_CMDB_RESOURCE_POOL_BIZ_ID = int(os.getenv("BK_CMDB_RESOURCE_POOL_BIZ_ID", 1)) or 1 BK_CMDB_RESOURCE_POOL_BIZ_NAME = "资源池" @@ -653,6 +651,9 @@ def get_standard_redis_mode(cls, config_redis_mode: str, default: Optional[str] BKAPP_EE_SOPS_TEMPLATE_ID = os.getenv("BKAPP_EE_SOPS_TEMPLATE_ID") BKAPP_REQUEST_EE_SOPS_BK_BIZ_ID = os.getenv("BKAPP_REQUEST_EE_SOPS_BK_BIZ_ID") + +# 管控平台平台版本 +GSE_VERSION = env.GSE_VERSION # agent 安装路径配置 GSE_AGENT_HOME = os.getenv("BKAPP_GSE_AGENT_HOME") or "/usr/local/gse" GSE_AGENT_LOG_DIR = os.getenv("BKAPP_GSE_AGENT_LOG_DIR") or "/var/log/gse" diff --git a/env/__init__.py b/env/__init__.py index 6ed594962..bd1cb0b43 100644 --- a/env/__init__.py +++ b/env/__init__.py @@ -21,6 +21,7 @@ "LOG_TYPE", "LOG_LEVEL", "BK_LOG_DIR", + "GSE_VERSION", "ENVIRONMENT", # esb 访问地址 "BK_COMPONENT_API_URL", @@ -53,3 +54,10 @@ LOG_LEVEL = get_type_env(key="LOG_LEVEL", default="INFO", _type=str) # 日志所在目录 BK_LOG_DIR = get_type_env(key="BK_LOG_DIR", default="./../bk_nodeman/logs", _type=str) + + +# =============================================================================== +# 蓝鲸管控平台 +# =============================================================================== +# 平台版本 +GSE_VERSION = get_type_env(key="GSE_VERSION", default=constants.GseVersion.V1.value) diff --git a/env/constants.py b/env/constants.py index 4d9d39a81..fb7d6fd21 100644 --- a/env/constants.py +++ b/env/constants.py @@ -32,3 +32,12 @@ class LogType(EnhanceEnum): @classmethod def _get_member__alias_map(cls) -> Dict[Enum, str]: return {cls.DEFAULT: "与开发框架保持一致", cls.STDOUT: "标准输出"} + + +class GseVersion(EnhanceEnum): + V1 = "V1" + V2 = "V2" + + @classmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + return {cls.V1: "V1", cls.V2: "V2"} diff --git a/support-files/kubernetes/helm/bk-nodeman/README.md b/support-files/kubernetes/helm/bk-nodeman/README.md index 7243af565..2a99d996a 100644 --- a/support-files/kubernetes/helm/bk-nodeman/README.md +++ b/support-files/kubernetes/helm/bk-nodeman/README.md @@ -290,9 +290,11 @@ externalRabbitMQ: | `config.bkIamV3AppCode` | 蓝鲸权限中心相关配置,权限中心 AppCode | `"bk_iam"` | | `config.bkAppIamResourceApiHost` | 蓝鲸权限中心相关配置,权限中心拉取权限相关资源的访问地址,默认取 `{{ .Values.bkNodemanUrl }}` | `""` | | `config.bkAppBkNodeApiGateway` | 组件 API 接入地址,节点管理网关地址,用于覆盖 `bkComponentApiUrl` 访问节点管理
⚠️ 配置为 `{{ .Values.bkNodemanApiUrl }`} 由于 JWT 校验问题,会导致 Agent 安装步骤中「安装预制插件」失败 | `""` | +| `config.bkAppBkGseApiGateway` | 管控平台 API 访问地址,用于覆盖 `bkComponentApiUrl` 访问管控平台 API | `""` | | `config.bkAppBackendHost` | 节点管理自身模块依赖,后台访问地址,渲染时为空默认取 `{{ .Values.bkNodemanApiUrl }}` | `""` | | `config.bkAppNodemanCallbackUrl` | 节点管理自身模块依赖,后台内网回调地址,渲染时为空取 `{{ .Values.bkNodemanUrl }}/backend` | `""` | | `config.bkAppNodemanOuterCallbackUrl` | 节点管理自身模块依赖,后台外网回调地址,渲染时为空取 `{{ .Values.bkNodemanUrl }}/backend` | `""` | +| `config.gseVersion` | 蓝鲸管控平台版本,默认为 `V1`,可选:`V1` `V2` | `V1` | | `config.gseEnableSvrDisCovery` | 蓝鲸管控平台 Agent,AgentXXDir 仅在初次部署有效,后续可以在页面「全局配置」维护。是否启用 GSE 服务探测,默认为 `true` | `true` | | `config.bkAppGseZkHost` | 蓝鲸管控平台 Agent,zk hosts 信息,host:port,多个 hosts 以 `,` 分隔
⚠️ ZK hosts 将作为 Agent 配置,需要保证 Agent 可访问,所以不能使用 k8s service 信息 进行配置
如果 zk 通过 k8s 部署,建议通过 NodePort 等方式暴露服务,使用 NodeIP:NodePort 进行配置 | `127.0.0.1:2181` | | `config.bkAppGseZkAuth` | 蓝鲸管控平台 Agent,ZK 认证信息,用户名:密码 | `bkzk:zkpass` | diff --git a/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml b/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml index 6ab1b2faa..61a365304 100644 --- a/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml +++ b/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml @@ -48,12 +48,16 @@ data: {{- if .Values.config.bkAppBkNodeApiGateway }} BKAPP_BK_NODE_APIGATEWAY: "{{ .Values.config.bkAppBkNodeApiGateway }}/" {{- end }} + {{- if .Values.config.bkAppBkGseApiGateway }} + BKAPP_BK_GSE_APIGATEWAY: "{{ .Values.config.bkAppBkGseApiGateway }}/" + {{- end }} BK_NODEMAN_URL: "{{ .Values.bkNodemanUrl }}" BKAPP_BACKEND_HOST: "{{ .Values.config.bkAppBackendHost | default .Values.bkNodemanApiUrl }}" BKAPP_NODEMAN_CALLBACK_URL: "{{ .Values.config.bkAppNodemanCallbackUrl | default ( printf "%s/%s" .Values.bkNodemanUrl "backend" ) }}" BKAPP_NODEMAN_OUTER_CALLBACK_URL: "{{ .Values.config.bkAppNodemanOuterCallbackUrl | default ( printf "%s/%s" .Values.bkNodemanUrl "backend" ) }}" + GSE_VERSION: "{{ .Values.config.gseVersion }}" GSE_ENABLE_SVR_DISCOVERY: "{{ .Values.config.gseEnableSvrDisCovery }}" BKAPP_GSE_ZK_HOST: "{{ .Values.config.bkAppGseZkHost }}" BKAPP_GSE_ZK_AUTH: "{{ .Values.config.bkAppGseZkAuth }}" diff --git a/support-files/kubernetes/helm/bk-nodeman/values.yaml b/support-files/kubernetes/helm/bk-nodeman/values.yaml index 6ae67dfb3..53ab2e299 100644 --- a/support-files/kubernetes/helm/bk-nodeman/values.yaml +++ b/support-files/kubernetes/helm/bk-nodeman/values.yaml @@ -308,6 +308,8 @@ config: ## ## 节点管理 API 访问地址,用于覆盖 bkComponentApiUrl 访问节点管理 bkAppBkNodeApiGateway: "" + ## 管控平台 API 访问地址,用于覆盖 bkComponentApiUrl 访问管控平台 API + bkAppBkGseApiGateway: "" ## 节点管理自身模块依赖 ## @@ -320,6 +322,8 @@ config: ## 蓝鲸管控平台 Agent,AgentXXDir 仅在初次部署有效,后续可以在页面「全局配置」维护 ## + ## 平台版本,默认为 `V1`,可选:`V1` `V2` + gseVersion: "V1" ## 是否启用 GSE 服务探测,为 `true` 将定期更新默认接入点的 gse svr 信息 gseEnableSvrDisCovery: true ## ZK hosts 信息,host:port,多个 hosts 以 `,` 分隔