From c0f21cd62112f9e24e6e562bdf21f9cd246967ab Mon Sep 17 00:00:00 2001
From: crayon <873217631@qq.com>
Date: Thu, 9 Jun 2022 10:39:48 +0800
Subject: [PATCH] =?UTF-8?q?feature:=20GSE=20=E5=A4=9A=E7=89=88=E6=9C=AC=20?=
=?UTF-8?q?API=20=E9=80=82=E9=85=8D=20(closed=20#780)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
apps/adapters/__init__.py | 10 +
apps/adapters/api/__init__.py | 10 +
apps/adapters/api/gse/__init__.py | 42 +++
apps/adapters/api/gse/base.py | 200 ++++++++++
apps/adapters/api/gse/v1.py | 103 +++++
apps/adapters/api/gse/v2.py | 86 +++++
apps/backend/agent/manager.py | 14 -
.../collections/agent_new/components.py | 7 -
.../agent_new/delegate_plugin_proc.py | 295 ---------------
.../collections/agent_new/get_agent_status.py | 26 +-
apps/backend/components/collections/plugin.py | 17 +-
apps/backend/subscription/steps/plugin.py | 28 +-
.../agent_new/test_delegate_plugin_proc.py | 351 ------------------
.../agent_new/test_get_agent_status.py | 11 +-
.../plugin/test_gse_operate_proc.py | 7 +-
.../plugin/test_job_allocate_port.py | 1 -
.../collections/plugin/test_remove_config.py | 1 -
.../plugin/test_render_and_push_config.py | 1 -
.../plugin/test_transfer_package.py | 1 -
.../plugin/test_uninstall_package.py | 1 -
.../components/collections/plugin/utils.py | 2 +-
apps/mock_data/api_mkd/gse/unit.py | 62 ++--
apps/mock_data/api_mkd/gse/utils.py | 15 +-
.../periodic_tasks/sync_agent_status_task.py | 96 ++---
.../node_man/periodic_tasks/sync_cmdb_host.py | 8 +-
.../periodic_tasks/sync_proc_status_task.py | 146 +++-----
.../test_sync_agent_status_task.py | 33 +-
.../test_sync_proc_status_task.py | 13 +-
common/api/modules/gse.py | 30 +-
config/default.py | 5 +-
env/__init__.py | 8 +
env/constants.py | 9 +
.../kubernetes/helm/bk-nodeman/README.md | 2 +
.../templates/configmaps/env-configmap.yaml | 4 +
.../kubernetes/helm/bk-nodeman/values.yaml | 4 +
35 files changed, 704 insertions(+), 945 deletions(-)
create mode 100644 apps/adapters/__init__.py
create mode 100644 apps/adapters/api/__init__.py
create mode 100644 apps/adapters/api/gse/__init__.py
create mode 100644 apps/adapters/api/gse/base.py
create mode 100644 apps/adapters/api/gse/v1.py
create mode 100644 apps/adapters/api/gse/v2.py
delete mode 100644 apps/backend/components/collections/agent_new/delegate_plugin_proc.py
delete mode 100644 apps/backend/tests/components/collections/agent_new/test_delegate_plugin_proc.py
diff --git a/apps/adapters/__init__.py b/apps/adapters/__init__.py
new file mode 100644
index 000000000..29ed269e0
--- /dev/null
+++ b/apps/adapters/__init__.py
@@ -0,0 +1,10 @@
+# -*- coding: utf-8 -*-
+"""
+TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
+Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
+Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at https://opensource.org/licenses/MIT
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+"""
diff --git a/apps/adapters/api/__init__.py b/apps/adapters/api/__init__.py
new file mode 100644
index 000000000..29ed269e0
--- /dev/null
+++ b/apps/adapters/api/__init__.py
@@ -0,0 +1,10 @@
+# -*- coding: utf-8 -*-
+"""
+TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
+Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
+Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at https://opensource.org/licenses/MIT
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+"""
diff --git a/apps/adapters/api/gse/__init__.py b/apps/adapters/api/gse/__init__.py
new file mode 100644
index 000000000..c2b3866d6
--- /dev/null
+++ b/apps/adapters/api/gse/__init__.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+"""
+TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
+Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
+Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at https://opensource.org/licenses/MIT
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+"""
+import typing
+
+from django.conf import settings
+
+from common.api import GseApi
+from common.log import logger
+from env import constants
+
+from .base import GseApiBaseHelper
+from .v1 import GseV1ApiHelper
+from .v2 import GseV2ApiHelper
+
+GSE_HELPERS: typing.Dict[str, typing.Type[GseApiBaseHelper]] = {
+ constants.GseVersion.V1.value: GseV1ApiHelper,
+ constants.GseVersion.V2.value: GseV2ApiHelper,
+}
+
+
+def get_gse_api_helper(gse_version: str) -> typing.Type[GseApiBaseHelper]:
+ if gse_version not in GSE_HELPERS:
+ logger.warning(
+ f"Get GseApiHelper failed: "
+ f"unsupported gse_version -> {gse_version}, options -> {GSE_HELPERS.values()}; "
+ f"use default -> {constants.GseVersion.V1.value}"
+ )
+ gse_version = constants.GseVersion.V1.value
+ return GSE_HELPERS[gse_version]
+
+
+GseApiHelper: GseApiBaseHelper = get_gse_api_helper(settings.GSE_VERSION)(
+ version=settings.GSE_VERSION, gse_api_obj=GseApi
+)
diff --git a/apps/adapters/api/gse/base.py b/apps/adapters/api/gse/base.py
new file mode 100644
index 000000000..a04ba8344
--- /dev/null
+++ b/apps/adapters/api/gse/base.py
@@ -0,0 +1,200 @@
+# -*- coding: utf-8 -*-
+"""
+TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
+Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
+Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at https://opensource.org/licenses/MIT
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+"""
+import abc
+import typing
+from collections import ChainMap
+
+from apps.core.concurrent import controller
+from apps.node_man import constants, models
+from apps.utils import concurrent
+from common.api import GseApi
+
+InfoDict = typing.Dict[str, typing.Any]
+InfoDictList = typing.List[InfoDict]
+AgentIdInfoMap = typing.Dict[str, InfoDict]
+
+
+class GseApiBaseHelper(abc.ABC):
+
+ version: str = None
+ gse_api_obj = None
+
+ def __init__(self, version: str, gse_api_obj=GseApi):
+ self.version = version
+ self.gse_api_obj = gse_api_obj
+
+ @abc.abstractmethod
+ def get_agent_id(self, mixed_types_of_host_info: typing.Union[InfoDict, models.Host]) -> str:
+ """
+ 获取 Agent 唯一标识
+ :param mixed_types_of_host_info: 携带主机信息的混合类型对象
+ :return:
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _list_proc_state(
+ self,
+ namespace: str,
+ proc_name: str,
+ labels: InfoDict,
+ host_info_list: InfoDictList,
+ extra_meta_data: InfoDict,
+ **options,
+ ) -> AgentIdInfoMap:
+ """
+ 获取进程状态信息
+ :param namespace: 命名空间
+ :param proc_name: 进程名称
+ :param labels: 标签
+ :param host_info_list: 主机信息列表
+ :param extra_meta_data: 额外的元数据
+ :param options: 其他可能需要的参数
+ :return:
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _list_agent_state(self, host_info_list: InfoDictList) -> AgentIdInfoMap:
+ """
+ 获取 Agent 状态信息
+ :param host_info_list: AgentId - Agent 状态信息映射关系
+ :return:
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def preprocessing_proc_operate_info(self, host_info_list: InfoDictList, proc_operate_info: InfoDict) -> InfoDict:
+ """
+ 进程操作信息预处理
+ :param host_info_list: 主机信息列表
+ :param proc_operate_info: 进程操作信息
+ :return:
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _operate_proc_multi(self, proc_operate_req: InfoDictList, **options) -> str:
+ """
+ 批量进程操作
+ :param proc_operate_req: 进程操作信息列表
+ :param options: 其他可能需要的参数
+ :return:
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def get_proc_operate_result(self, task_id: str) -> InfoDict:
+ """
+ 获取进程操作结果
+ :param task_id: GSE 任务 ID
+ :return:
+ """
+ raise NotImplementedError
+
+ @staticmethod
+ def get_version(version_str: typing.Optional[str]) -> str:
+ version_match: typing.Optional[typing.Match] = constants.VERSION_PATTERN.search(version_str or "")
+ return version_match.group() if version_match else ""
+
+ def get_gse_proc_key(
+ self, mixed_types_of_host_info: typing.Union[InfoDict, models.Host], namespace: str, proc_name: str, **options
+ ) -> str:
+ """
+ 获取进程唯一标识
+ :param mixed_types_of_host_info: 携带主机信息的混合类型对象
+ :param namespace: 命名空间
+ :param proc_name: 进程名称
+ :param options: 其他可能需要的关键字参数
+ :return:
+ """
+ return f"{self.get_agent_id(mixed_types_of_host_info)}:{namespace}:{proc_name}"
+
+ def list_proc_state(
+ self,
+ namespace: str,
+ proc_name: str,
+ labels: InfoDict,
+ host_info_list: InfoDictList,
+ extra_meta_data: InfoDict,
+ **options,
+ ) -> AgentIdInfoMap:
+ """
+ 获取进程状态信息
+ :param namespace: 命名空间
+ :param proc_name: 进程名称
+ :param labels: 标签
+ :param host_info_list: 主机信息列表
+ :param extra_meta_data: 额外的元数据
+ :param options: 其他可能需要的参数
+ :return:
+ """
+
+ @controller.ConcurrentController(
+ data_list_name="_host_info_list",
+ batch_call_func=concurrent.batch_call,
+ extend_result=False,
+ get_config_dict_func=lambda: {"limit": constants.QUERY_PROC_STATUS_HOST_LENS},
+ )
+ def get_proc_status_inner(
+ _namespace: str,
+ _proc_name: str,
+ _labels: InfoDict,
+ _host_info_list: InfoDictList,
+ _extra_meta_data: InfoDict,
+ **_options,
+ ) -> typing.List[AgentIdInfoMap]:
+ return self._list_proc_state(_namespace, _proc_name, _labels, _host_info_list, _extra_meta_data, **_options)
+
+ agent_id__proc_status_info_map_list: typing.List[AgentIdInfoMap] = get_proc_status_inner(
+ _namespace=namespace,
+ _proc_name=proc_name,
+ _labels=labels,
+ _host_info_list=host_info_list,
+ _extra_meta_data=extra_meta_data,
+ **options,
+ )
+ return dict(ChainMap(*agent_id__proc_status_info_map_list))
+
+ def list_agent_state(self, host_info_list: InfoDictList) -> AgentIdInfoMap:
+ """
+ 获取 Agent 状态信息
+ 版本 / 状态
+ :param host_info_list: AgentId - Agent 状态信息映射关系
+ :return:
+ """
+
+ @controller.ConcurrentController(
+ data_list_name="_host_info_list",
+ batch_call_func=concurrent.batch_call,
+ extend_result=False,
+ get_config_dict_func=lambda: {"limit": constants.QUERY_AGENT_STATUS_HOST_LENS},
+ )
+ def list_agent_state_inner(_host_info_list: InfoDictList) -> typing.List[AgentIdInfoMap]:
+ return self._list_agent_state(_host_info_list)
+
+ agent_id__agent_state_info_map_list: typing.List[AgentIdInfoMap] = list_agent_state_inner(
+ _host_info_list=host_info_list
+ )
+ return dict(ChainMap(*agent_id__agent_state_info_map_list))
+
+ def operate_proc_multi(self, proc_operate_req: InfoDictList, **options) -> str:
+ """
+ 批量进程操作
+ :param proc_operate_req: 进程操作信息列表
+ :param options: 其他可能需要的参数
+ :return:
+ """
+ preprocessed_proc_operate_req: InfoDictList = []
+ for proc_operate_info in proc_operate_req:
+ hosts = proc_operate_info.pop("hosts")
+ preprocessed_proc_operate_req.append(self.preprocessing_proc_operate_info(hosts, proc_operate_info))
+ return self._operate_proc_multi(preprocessed_proc_operate_req, **options)
diff --git a/apps/adapters/api/gse/v1.py b/apps/adapters/api/gse/v1.py
new file mode 100644
index 000000000..c02bd7197
--- /dev/null
+++ b/apps/adapters/api/gse/v1.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+"""
+TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
+Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
+Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at https://opensource.org/licenses/MIT
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+"""
+import typing
+from collections import Mapping
+
+from apps.node_man import constants, models
+
+from . import base
+
+
+class GseV1ApiHelper(base.GseApiBaseHelper):
+ def get_agent_id(self, mixed_types_of_host_info: typing.Union[base.InfoDict, models.Host]) -> str:
+ if isinstance(mixed_types_of_host_info, Mapping):
+ if "host" in mixed_types_of_host_info:
+ return self.get_agent_id(mixed_types_of_host_info["host"])
+ bk_cloud_id: int = mixed_types_of_host_info["bk_cloud_id"]
+ for attr_name in ["ip", "inner_ip", "bk_host_innerip"]:
+ ip = mixed_types_of_host_info.get(attr_name, "")
+ if ip:
+ break
+ if not ip:
+ raise ValueError("can not get ip in dict type host info")
+ else:
+ bk_cloud_id: typing.Optional[int] = getattr(mixed_types_of_host_info, "bk_cloud_id")
+ if bk_cloud_id is None:
+ raise ValueError("can not get bk_cloud_id in obj type host info")
+ for attr_name in ["inner_ip"]:
+ ip = getattr(mixed_types_of_host_info, attr_name, "")
+ if ip:
+ break
+ if not ip:
+ raise ValueError("can not get ip in obj type host info")
+
+ return f"{bk_cloud_id}:{ip}"
+
+ def _list_proc_state(
+ self,
+ namespace: str,
+ proc_name: str,
+ labels: base.InfoDict,
+ host_info_list: base.InfoDictList,
+ extra_meta_data: base.InfoDict,
+ **options,
+ ) -> base.AgentIdInfoMap:
+ hosts: base.InfoDictList = [
+ {"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} for host_info in host_info_list
+ ]
+ query_params: base.InfoDict = {
+ "meta": {"namespace": constants.GSE_NAMESPACE, "name": proc_name, "labels": {"proc_name": proc_name}},
+ "hosts": hosts,
+ }
+ # 接口测试
+ # 200: 0.3s-0.4s (0.41s 0.29s 0.33s)
+ # 500: 0.35s-0.5s (0.34s 0.42s 0.51s)
+ # 1000: 0.5s-0.6s (0.53s 0.56s 0.61s)
+ # 2000: 0.9s (0.91s, 0.93s)
+ # 5000: 2s-4s (2.3s 2.2s 4.1s 4.3s)
+ proc_infos: base.InfoDictList = self.gse_api_obj.get_proc_status(query_params).get("proc_infos") or []
+ agent_id__proc_info_map: base.AgentIdInfoMap = {}
+ for proc_info in proc_infos:
+ agent_id__proc_info_map[self.get_agent_id(proc_info)] = {
+ "version": self.get_version(proc_info.get("version")),
+ "status": constants.PLUGIN_STATUS_DICT[proc_info["status"]],
+ "is_auto": constants.AutoStateType.AUTO if proc_info["isauto"] else constants.AutoStateType.UNAUTO,
+ "name": proc_info["meta"]["name"],
+ }
+ return agent_id__proc_info_map
+
+ def _list_agent_state(self, host_info_list: base.InfoDictList) -> base.AgentIdInfoMap:
+ hosts: base.InfoDictList = [
+ {"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} for host_info in host_info_list
+ ]
+ agent_id__info_map: base.AgentIdInfoMap = self.gse_api_obj.get_agent_info({"hosts": hosts})
+ agent_id__status_map: base.AgentIdInfoMap = self.gse_api_obj.get_agent_status({"hosts": hosts})
+
+ for agent_id, agent_info in agent_id__info_map.items():
+ agent_info.update(agent_id__status_map.get(agent_id) or {})
+ agent_info["version"] = self.get_version(agent_info.get("version"))
+ agent_info["bk_agent_alive"] = agent_info.get("bk_agent_alive") or constants.BkAgentStatus.NOT_ALIVE.value
+
+ return agent_id__info_map
+
+ def preprocessing_proc_operate_info(
+ self, host_info_list: base.InfoDictList, proc_operate_info: base.InfoDict
+ ) -> base.InfoDict:
+ proc_operate_info["hosts"] = [
+ {"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} for host_info in host_info_list
+ ]
+ return proc_operate_info
+
+ def _operate_proc_multi(self, proc_operate_req: base.InfoDictList, **options) -> str:
+ return self.gse_api_obj.operate_proc_multi({"proc_operate_req": proc_operate_req})["task_id"]
+
+ def get_proc_operate_result(self, task_id: str) -> base.InfoDict:
+ return self.gse_api_obj.get_proc_operate_result({"task_id": task_id}, raw=True)
diff --git a/apps/adapters/api/gse/v2.py b/apps/adapters/api/gse/v2.py
new file mode 100644
index 000000000..396300e0a
--- /dev/null
+++ b/apps/adapters/api/gse/v2.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+"""
+TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
+Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
+Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at https://opensource.org/licenses/MIT
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+"""
+
+import typing
+from collections import Mapping
+
+from apps.node_man import constants, models
+
+from . import base
+from .v1 import GseV1ApiHelper
+
+
+class GseV2ApiHelper(GseV1ApiHelper):
+ def get_agent_id(self, mixed_types_of_host_info: typing.Union[base.InfoDict, models.Host]) -> str:
+ if isinstance(mixed_types_of_host_info, Mapping):
+ if "host" in mixed_types_of_host_info:
+ return self.get_agent_id(mixed_types_of_host_info["host"])
+ bk_agent_id: typing.Optional[str] = mixed_types_of_host_info.get("bk_agent_id")
+ else:
+ bk_agent_id: typing.Optional[int] = getattr(mixed_types_of_host_info, "bk_agent_id")
+ if not bk_agent_id:
+ return super(GseV2ApiHelper, self).get_agent_id(mixed_types_of_host_info)
+ return bk_agent_id
+
+ def _list_proc_state(
+ self,
+ namespace: str,
+ proc_name: str,
+ labels: base.InfoDict,
+ host_info_list: base.InfoDictList,
+ extra_meta_data: base.InfoDict,
+ **options
+ ) -> base.AgentIdInfoMap:
+ agent_id_list: typing.List[str] = [self.get_agent_id(host_info) for host_info in host_info_list]
+ query_params: base.InfoDict = {
+ "meta": {"namespace": constants.GSE_NAMESPACE, "name": proc_name, "labels": {"proc_name": proc_name}},
+ "agent_id_list": agent_id_list,
+ }
+ proc_infos: base.InfoDictList = (
+ self.gse_api_obj.v2_proc_get_proc_status_v2(query_params).get("proc_infos") or []
+ )
+ agent_id__proc_info_map: base.AgentIdInfoMap = {}
+ for proc_info in proc_infos:
+ proc_info["host"]["bk_agent_id"] = proc_info["bk_agent_id"]
+ agent_id__proc_info_map[self.get_agent_id(proc_info)] = {
+ "version": self.get_version(proc_info.get("version")),
+ "status": constants.PLUGIN_STATUS_DICT[proc_info["status"]],
+ "is_auto": constants.AutoStateType.AUTO if proc_info["isauto"] else constants.AutoStateType.UNAUTO,
+ "name": proc_info["meta"]["name"],
+ }
+ return agent_id__proc_info_map
+
+ def _list_agent_state(self, host_info_list: base.InfoDictList) -> base.AgentIdInfoMap:
+ agent_id_list: typing.List[str] = [self.get_agent_id(host_info) for host_info in host_info_list]
+ agent_state_list: base.InfoDictList = self.gse_api_obj.v2_cluster_list_agent_state(
+ {"agent_id_list": agent_id_list}
+ )
+ agent_id__state_map: base.AgentIdInfoMap = {}
+ for agent_state in agent_state_list:
+ status_code: typing.Optional[int] = agent_state.pop("status_code", None)
+ if status_code == constants.GseAgentStatusCode.RUNNING.value:
+ agent_state["bk_agent_alive"] = constants.BkAgentStatus.ALIVE.value
+ else:
+ agent_state["bk_agent_alive"] = constants.BkAgentStatus.NOT_ALIVE.value
+ agent_id__state_map[agent_state["bk_agent_id"]] = agent_state
+ return agent_id__state_map
+
+ def preprocessing_proc_operate_info(
+ self, host_info_list: base.InfoDictList, proc_operate_info: base.InfoDict
+ ) -> base.InfoDict:
+ proc_operate_info["agent_id_list"] = [self.get_agent_id(host_info) for host_info in host_info_list]
+ return proc_operate_info
+
+ def _operate_proc_multi(self, proc_operate_req: base.InfoDictList, **options) -> str:
+ return self.gse_api_obj.v2_proc_operate_proc_multi({"proc_operate_req": proc_operate_req})["task_id"]
+
+ def get_proc_operate_result(self, task_id: str) -> base.InfoDict:
+ return self.gse_api_obj.v2_proc_get_proc_operate_result_v2({"task_id": task_id}, raw=True)
diff --git a/apps/backend/agent/manager.py b/apps/backend/agent/manager.py
index a38bd674a..287f55052 100644
--- a/apps/backend/agent/manager.py
+++ b/apps/backend/agent/manager.py
@@ -185,20 +185,6 @@ def start_nginx(cls):
act.component.inputs.script_param = Var(type=Var.PLAIN, value="")
return act
- @classmethod
- def delegate_plugin(cls, plugin_name: str):
- """
- 托管插件
- :param plugin_name: 插件名称
- :return:
- """
- act = AgentServiceActivity(
- component_code=components.DelegatePluginProcComponent.code,
- name=_("托管 {plugin_name} 插件进程").format(plugin_name=plugin_name),
- )
- act.component.inputs.plugin_name = Var(type=Var.PLAIN, value=plugin_name)
- return act
-
@classmethod
def render_and_push_gse_config(cls, name=components.RenderAndPushGseConfigComponent.name):
"""渲染并下载 agent 配置"""
diff --git a/apps/backend/components/collections/agent_new/components.py b/apps/backend/components/collections/agent_new/components.py
index 922eaaf6e..9334acfea 100644
--- a/apps/backend/components/collections/agent_new/components.py
+++ b/apps/backend/components/collections/agent_new/components.py
@@ -18,7 +18,6 @@
from .check_policy_gse_to_proxy import CheckPolicyGseToProxyService
from .choose_access_point import ChooseAccessPointService
from .configure_policy import ConfigurePolicyService
-from .delegate_plugin_proc import DelegatePluginProcService
from .get_agent_id import GetAgentIDService
from .get_agent_status import GetAgentStatusService
from .install import InstallService
@@ -72,12 +71,6 @@ class ConfigurePolicyComponent(Component):
bound_service = ConfigurePolicyService
-class DelegatePluginProcComponent(Component):
- name = _("托管插件进程")
- code = "delegate_plugin_proc"
- bound_service = DelegatePluginProcService
-
-
class GetAgentStatusComponent(Component):
name = _("查询Agent状态")
code = "get_agent_status"
diff --git a/apps/backend/components/collections/agent_new/delegate_plugin_proc.py b/apps/backend/components/collections/agent_new/delegate_plugin_proc.py
deleted file mode 100644
index bdf38c75b..000000000
--- a/apps/backend/components/collections/agent_new/delegate_plugin_proc.py
+++ /dev/null
@@ -1,295 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
-Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
-Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
-You may obtain a copy of the License at https://opensource.org/licenses/MIT
-Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
-specific language governing permissions and limitations under the License.
-"""
-import json
-import ntpath
-import posixpath
-from collections import defaultdict
-from typing import Any, Dict, List, Set, Union
-
-from django.db.models import Max
-from django.utils.translation import ugettext_lazy as _
-
-from apps.backend.api.constants import (
- GSE_RUNNING_TASK_CODE,
- POLLING_INTERVAL,
- POLLING_TIMEOUT,
- GseDataErrCode,
-)
-from apps.node_man import constants, models
-from apps.utils import concurrent
-from common.api import GseApi
-from pipeline.core.flow import Service, StaticIntervalGenerator
-
-from .base import AgentBaseService, AgentCommonData
-
-
-class DelegatePluginProcService(AgentBaseService):
-
- __need_schedule__ = True
- interval = StaticIntervalGenerator(POLLING_INTERVAL)
-
- def inputs_format(self):
- return super().inputs_format() + [
- Service.InputItem(name="plugin_name", key="plugin_name", type="str", required=True)
- ]
-
- def outputs_format(self):
- return super().outputs_format() + [
- Service.InputItem(name="task_id", key="task_id", type="str", required=True),
- Service.InputItem(name="proc_name", key="proc_name", type="str", required=True),
- Service.InputItem(name="polling_time", key="polling_time", type="int", required=True),
- ]
-
- def update_proc_infos(
- self, sub_insts: List[models.SubscriptionInstanceRecord], host_objs: List[models.Host], package: models.Packages
- ) -> List[int]:
- """
- 更新插件进程信息
- :param sub_insts: 订阅实例列表
- :param host_objs: 主机对象列表
- :param package: 插件包
- :return: 成功更新进程信息的主机ID列表
- """
- proc_name = package.project
- proc_control: models.ProcControl = package.proc_control
- if package.os == constants.PluginOsType.windows:
- path_handler = ntpath
- else:
- path_handler = posixpath
- setup_path = path_handler.join(
- package.proc_control.install_path, constants.PluginChildDir.OFFICIAL.value, "bin"
- )
- pid_path = package.proc_control.pid_path
- update_proc_info_params = {
- "meta": {"labels": {"proc_name": proc_name}, "namespace": constants.GSE_NAMESPACE, "name": proc_name},
- "spec": {
- "control": {
- "start_cmd": proc_control.start_cmd,
- "stop_cmd": proc_control.stop_cmd,
- "restart_cmd": proc_control.restart_cmd,
- "reload_cmd": proc_control.reload_cmd or proc_control.restart_cmd,
- "kill_cmd": proc_control.kill_cmd,
- "version_cmd": proc_control.version_cmd,
- "health_cmd": proc_control.health_cmd,
- },
- "monitor_policy": {"auto_type": 1},
- "resource": {"mem": 10, "cpu": 10},
- "identity": {
- "user": constants.ACCOUNT_MAP.get(package.os, "root"),
- "proc_name": proc_name,
- "setup_path": setup_path,
- "pid_path": pid_path,
- },
- },
- }
-
- sub_inst_ids: List[int] = [sub_inst.id for sub_inst in sub_insts]
- self.log_info(
- sub_inst_ids=sub_inst_ids,
- log_content=_(
- "更新 {proc_name} 进程信息:call GSE api -> update_proc_info with params: \n {query_params})"
- ).format(proc_name=proc_name, query_params=json.dumps(update_proc_info_params, indent=2)),
- )
- update_proc_info_params["hosts"] = [
- {
- "ip": host_obj.inner_ip,
- "bk_cloud_id": host_obj.bk_cloud_id,
- "bk_supplier_id": constants.DEFAULT_SUPPLIER_ID,
- }
- for host_obj in host_objs
- ]
-
- gse_proc_key__proc_result_map: Dict[str, Dict[str, Any]] = GseApi.update_proc_info(update_proc_info_params)
-
- success_host_ids: List[int] = []
- success_sub_inst_ids: List[int] = []
- host_id__sub_inst_id_map: Dict[int, int] = {
- sub_inst.instance_info["host"]["bk_host_id"]: sub_inst.id for sub_inst in sub_insts
- }
- for host_obj in host_objs:
- sub_inst_id = host_id__sub_inst_id_map[host_obj.bk_host_id]
- gse_proc_key = (
- f"{host_obj.bk_cloud_id}:{constants.DEFAULT_SUPPLIER_ID}:{host_obj.inner_ip}:"
- f"{constants.GSE_NAMESPACE}:{proc_name}"
- )
- proc_result = gse_proc_key__proc_result_map.get(gse_proc_key)
- if not proc_result:
- self.move_insts_to_failed(
- [sub_inst_id],
- log_content=_("未能查询到进程更新结果,gse_proc_key -> {gse_proc_key}".format(gse_proc_key=gse_proc_key)),
- )
- continue
-
- error_code = proc_result.get("error_code")
- if error_code != GseDataErrCode.SUCCESS:
- self.move_insts_to_failed(
- [sub_inst_id],
- log_content=_(
- "GSE 更新 {proc_name} 进程信息失败,gse_proc_key -> {gse_proc_key}, "
- "error_code -> {error_code}, error_msg -> {error_msg}"
- ).format(
- proc_name=proc_name,
- gse_proc_key=gse_proc_key,
- error_code=error_code,
- error_msg=proc_result.get("error_msg"),
- ),
- )
- continue
-
- success_host_ids.append(host_obj.bk_host_id)
- success_sub_inst_ids.append(host_id__sub_inst_id_map[host_obj.bk_host_id])
-
- self.log_info(
- sub_inst_ids=success_sub_inst_ids, log_content=_("已成功更新 {proc_name} 进程信息").format(proc_name=proc_name)
- )
- return success_host_ids
-
- def _execute(self, data, parent_data, common_data: AgentCommonData):
- host_id_obj_map = common_data.host_id_obj_map
- plugin_name = data.get_one_of_inputs("plugin_name")
- latest_pkg_ids_of_each_os: List[int] = (
- models.Packages.objects.filter(project=plugin_name, cpu_arch=constants.CpuType.x86_64)
- .values("os")
- .annotate(id=Max("id"))
- .values_list("id", flat=True)
- )
- packages = models.Packages.objects.filter(id__in=latest_pkg_ids_of_each_os)
- os__package_map: Dict[str, models.Packages] = {package.os: package for package in packages}
-
- skipped_host_ids: Set[int] = set()
- bk_host_id__sub_inst_id_map: Dict[int, int] = {}
- # 按操作系统类型对订阅实例进行聚合,仅取匹配插件包成功的实例
- host_objs_gby_host_os: Dict[str, List[models.Host]] = defaultdict(list)
- sub_insts_gby_host_os: Dict[str, List[models.SubscriptionInstanceRecord]] = defaultdict(list)
- for sub_inst in common_data.subscription_instances:
- host_obj: models.Host = host_id_obj_map[sub_inst.instance_info["host"]["bk_host_id"]]
- # 处理插件包不适配的场景
- if host_obj.os_type.lower() not in os__package_map:
- skipped_host_ids.add(host_obj.bk_host_id)
- # 考虑这种情况较少且为了简化逻辑,暂不聚合相同日志内容再打印
- self.log_warning(
- [sub_inst.id],
- log_content=_("「{plugin_name}」不支持操作系统 -> {os_type},跳过该步骤").format(
- plugin_name=plugin_name, os_type=host_obj.os_type
- ),
- )
- continue
-
- bk_host_id__sub_inst_id_map[host_obj.bk_host_id] = sub_inst.id
- sub_insts_gby_host_os[host_obj.os_type].append(sub_inst)
- host_objs_gby_host_os[host_obj.os_type].append(host_obj)
-
- # 根据操作系统,多线程请求 update_proc_infos
- update_proc_infos_params_list: List[Dict] = []
- for os_upper, sub_insts in sub_insts_gby_host_os.items():
- package = os__package_map[os_upper.lower()]
- update_proc_infos_params_list.append(
- {"sub_insts": sub_insts, "host_objs": host_objs_gby_host_os[os_upper], "package": package}
- )
-
- # 并发请求GSE,更新进程信息
- host_ids_to_be_delegate_proc = concurrent.batch_call(
- func=self.update_proc_infos,
- params_list=update_proc_infos_params_list,
- get_data=lambda x: x,
- extend_result=True,
- )
-
- # 没有需要托管进程的主机,直接结束调度并返回
- if not host_ids_to_be_delegate_proc:
- self.finish_schedule()
- return
-
- # 构造需要托管进程的主机信息
- sub_insts_ids_to_be_delegate_proc: List[int] = []
- hosts_need_delegate_proc: List[Dict[str, Union[str, int]]] = []
- for bk_host_id in host_ids_to_be_delegate_proc:
- host_obj = host_id_obj_map[bk_host_id]
- sub_insts_ids_to_be_delegate_proc.append(bk_host_id__sub_inst_id_map[host_obj.bk_host_id])
- hosts_need_delegate_proc.append({"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id})
-
- # 构造托管进程参数
- delegate_proc_base_params = {
- "meta": {"labels": {"proc_name": plugin_name}, "namespace": constants.GSE_NAMESPACE, "name": plugin_name},
- "op_type": constants.GseOpType.DELEGATE,
- }
- self.log_info(
- sub_insts_ids_to_be_delegate_proc,
- log_content=(
- _("托管 {proc_name} 进程:call GSE api -> operate_proc_v2 with params: \n {query_params}").format(
- proc_name=plugin_name, query_params=json.dumps(delegate_proc_base_params, indent=2)
- )
- ),
- )
- task_id = GseApi.operate_proc({**delegate_proc_base_params, "hosts": hosts_need_delegate_proc})["task_id"]
- self.log_info(sub_insts_ids_to_be_delegate_proc, f"GSE TASK ID: [{task_id}]")
-
- data.outputs.polling_time = 0
- data.outputs.task_id = task_id
- data.outputs.proc_name = plugin_name
- data.outputs.skipped_host_ids = skipped_host_ids
-
- def _schedule(self, data, parent_data, callback_data=None):
- task_id = data.get_one_of_outputs("task_id")
- polling_time = data.get_one_of_outputs("polling_time")
- skipped_host_ids: Set[int] = data.get_one_of_outputs("skipped_host_ids")
-
- # 查询进程操作结果,raw=True,返回接口完整响应数据
- procs_operate_result = GseApi.get_proc_operate_result({"task_id": task_id}, raw=True)
-
- api_code = procs_operate_result.get("code")
- if api_code == GSE_RUNNING_TASK_CODE:
- # GSE_RUNNING_TASK_CODE(1000115) 表示查询的任务等待执行中,还未入到 redis(需继续轮询进行查询)
- data.outputs.polling_time = polling_time + POLLING_INTERVAL
- return True
-
- is_finished = True
- common_data = self.get_common_data(data)
- proc_name = data.get_one_of_outputs("proc_name")
- for sub_inst in common_data.subscription_instances:
- host_obj = common_data.host_id_obj_map[sub_inst.instance_info["host"]["bk_host_id"]]
- gse_proc_key = f"{host_obj.bk_cloud_id}:{host_obj.inner_ip}:{constants.GSE_NAMESPACE}:{proc_name}"
-
- if host_obj.bk_host_id in skipped_host_ids:
- continue
-
- proc_operate_result = procs_operate_result["data"].get(gse_proc_key)
- if not proc_operate_result:
- self.move_insts_to_failed(
- [sub_inst.id],
- log_content=_("未能查询到进程操作结果, task_id -> {task_id}, gse_proc_key -> {key}").format(
- task_id=task_id, key=gse_proc_key
- ),
- )
- continue
-
- error_msg = proc_operate_result["error_msg"]
- error_code = proc_operate_result["error_code"]
- if error_code == GseDataErrCode.RUNNING:
- # 只要有运行中的任务,则认为未完成,标记 is_finished
- is_finished = False
- if polling_time + POLLING_INTERVAL > POLLING_TIMEOUT:
- self.move_insts_to_failed([sub_inst.id], _("GSE任务轮询超时"))
- elif error_code != GseDataErrCode.SUCCESS:
- # 其它状态码非 SUCCESS 的任务则认为是失败的
- self.move_insts_to_failed(
- [sub_inst.id],
- log_content=_("调用 GSE 接口异常:错误码 -> {error_code}「{error_code_alias}」, 错误信息 -> {error_msg}").format(
- error_code_alias=GseDataErrCode.ERROR_CODE__ALIAS_MAP.get(error_code, error_code),
- error_msg=error_msg,
- error_code=error_code,
- ),
- )
-
- if is_finished:
- self.finish_schedule()
- return True
- data.outputs.polling_time = polling_time + POLLING_INTERVAL
diff --git a/apps/backend/components/collections/agent_new/get_agent_status.py b/apps/backend/components/collections/agent_new/get_agent_status.py
index e54e2f6b2..ed9694c68 100644
--- a/apps/backend/components/collections/agent_new/get_agent_status.py
+++ b/apps/backend/components/collections/agent_new/get_agent_status.py
@@ -13,9 +13,9 @@
from django.utils.translation import ugettext_lazy as _
+from apps.adapters.api.gse import GseApiHelper
from apps.backend.api.constants import POLLING_INTERVAL, POLLING_TIMEOUT
from apps.node_man import constants, models
-from common.api import GseApi
from pipeline.core.flow import Service, StaticIntervalGenerator
from .base import AgentBaseService, AgentCommonData
@@ -70,13 +70,11 @@ def _schedule(self, data, parent_data, callback_data=None):
hosts: List[Dict[str, Union[int, str]]] = []
for host_id in host_ids_need_to_query:
host_obj = common_data.host_id_obj_map[host_id]
- hosts.append({"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id})
+ hosts.append(
+ {"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id, "bk_agent_id": host_obj.bk_agent_id}
+ )
- # 请求 gse
- host_key__agent_info_map: Dict[str, Dict[str, Union[int, str]]] = GseApi.get_agent_info({"hosts": hosts})
- host_key__agent_status_info_map: Dict[str, Dict[str, Union[int, str]]] = GseApi.get_agent_status(
- {"hosts": hosts}
- )
+ agent_id__agent_state_info_map: Dict[str, Dict] = GseApiHelper.list_agent_state(hosts)
# 分隔符,用于构造 bk_cloud_id - ip,status - version 等键
sep = ":"
@@ -92,20 +90,18 @@ def _schedule(self, data, parent_data, callback_data=None):
}
for host_id in host_ids_need_to_query:
host_obj = common_data.host_id_obj_map[host_id]
- host_key = f"{host_obj.bk_cloud_id}{sep}{host_obj.inner_ip}"
- # 根据 host_key,取得 agent 版本及状态信息
- agent_info = host_key__agent_info_map.get(host_key, {"version": ""})
- # 默认值为 None 的背景:expect_status 是可传入的,不能设定一个明确状态作为默认值,不然可能与 expect_status 一致
+ agent_id = GseApiHelper.get_agent_id(host_obj)
+ # bk_agent_alive 默认值为 None 的背景:expect_status 是可传入的,不能设定一个明确状态作为默认值,不然可能与 expect_status 一致
# 误判为当前 Agent 状态已符合预期
- agent_status_info = host_key__agent_status_info_map.get(host_key, {"bk_agent_alive": None})
+ agent_state_info = agent_id__agent_state_info_map.get(agent_id, {"version": "", "bk_agent_alive": None})
# 获取 agent 版本号及状态
- agent_status = constants.PROC_STATUS_DICT.get(agent_status_info["bk_agent_alive"], None)
- agent_version = agent_info["version"]
+ agent_status = constants.PROC_STATUS_DICT.get(agent_state_info["bk_agent_alive"], None)
+ agent_version = agent_state_info["version"]
self.log_info(
host_id__sub_inst_id_map[host_id],
log_content=_("查询 GSE 得到主机 [{host_key}] Agent 状态 -> {status_alias}「{status}」, 版本 -> {version}").format(
- host_key=host_key,
+ host_key=agent_id,
status_alias=constants.PROC_STATUS_CHN.get(agent_status, agent_status),
status=agent_status,
version=agent_version or _("无"),
diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py
index c814910e7..82e8c755d 100644
--- a/apps/backend/components/collections/plugin.py
+++ b/apps/backend/components/collections/plugin.py
@@ -23,6 +23,7 @@
from django.utils import timezone
from django.utils.translation import ugettext as _
+from apps.adapters.api.gse import GseApiHelper
from apps.backend.api.constants import (
GSE_RUNNING_TASK_CODE,
POLLING_INTERVAL,
@@ -45,7 +46,7 @@
from apps.utils import md5
from apps.utils.batch_request import request_multi_thread
from apps.utils.files import PathHandler
-from common.api import GseApi, JobApi
+from common.api import JobApi
from pipeline.component_framework.component import Component
from pipeline.core.flow import Service, StaticIntervalGenerator
@@ -1112,7 +1113,7 @@ def request_gse_or_finish_schedule(self, proc_operate_req: List, data):
"""批量请求GSE接口"""
# 当请求参数为空时,代表无需请求,直接 finish_schedule 跳过即可
if proc_operate_req:
- task_id = GseApi.operate_proc_multi({"proc_operate_req": proc_operate_req})["task_id"]
+ task_id = GseApiHelper.operate_proc_multi(proc_operate_req=proc_operate_req)
self.log_info(log_content=f"GSE TASK ID: [{task_id}]")
data.outputs.task_id = task_id
else:
@@ -1149,6 +1150,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
"process_status_id": process_status.id,
"subscription_instance_id": subscription_instance.id,
},
+ "hosts": [{"ip": host.inner_ip, "bk_agent_id": host.bk_agent_id, "bk_cloud_id": host.bk_cloud_id}],
"spec": {
"identity": {
"index_key": "",
@@ -1167,10 +1169,6 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
},
},
}
- if settings.GSE_VERSION == "V1":
- gse_op_params["hosts"] = [{"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id}]
- else:
- gse_op_params["agent_id_list"] = [host.bk_agent_id]
self.log_info(subscription_instance.id, json.dumps(gse_op_params, indent=2))
proc_operate_req.append(gse_op_params)
@@ -1229,7 +1227,7 @@ def _schedule(self, data, parent_data, callback_data=None):
plugin = policy_step_adapter.plugin_desc
group_id_instance_map = common_data.group_id_instance_map
- result = GseApi.get_proc_operate_result({"task_id": task_id}, raw=True)
+ result = GseApiHelper.get_proc_operate_result(task_id)
api_code = result.get("code")
if api_code == GSE_RUNNING_TASK_CODE:
# GSE_RUNNING_TASK_CODE(1000115) 表示查询的任务等待执行中,还未入到 redis(需继续轮询进行查询)
@@ -1243,10 +1241,7 @@ def _schedule(self, data, parent_data, callback_data=None):
subscription_instance = group_id_instance_map.get(process_status.group_id)
proc_name = self.get_plugin_meta_name(plugin, process_status)
- if host.bk_agent_id:
- gse_proc_key = f"{host.bk_agent_id}:{constants.GSE_NAMESPACE}:{proc_name}"
- else:
- gse_proc_key = f"{host.bk_cloud_id}:{host.inner_ip}:{constants.GSE_NAMESPACE}:{proc_name}"
+ gse_proc_key = GseApiHelper.get_gse_proc_key(host, constants.GSE_NAMESPACE, proc_name)
proc_operate_result = result["data"].get(gse_proc_key)
if not proc_operate_result:
self.move_insts_to_failed(
diff --git a/apps/backend/subscription/steps/plugin.py b/apps/backend/subscription/steps/plugin.py
index 9d93267d9..91b82207b 100644
--- a/apps/backend/subscription/steps/plugin.py
+++ b/apps/backend/subscription/steps/plugin.py
@@ -20,6 +20,7 @@
import six
from django.db.models import Q, QuerySet
+from apps.adapters.api.gse import GseApiHelper
from apps.backend import constants as backend_const
from apps.backend.plugin.manager import PluginManager, PluginServiceActivity
from apps.backend.subscription import errors, tools
@@ -28,7 +29,6 @@
from apps.backend.subscription.steps.base import Action, Step
from apps.node_man import constants, models
from apps.node_man.exceptions import ApIDNotExistsError
-from apps.node_man.periodic_tasks import sync_proc_status_task
from apps.utils import concurrent
from common.log import logger
from pipeline.builder import Data, Var
@@ -569,7 +569,7 @@ def handle_check_and_skip_instances(
:param instances:
:return:
"""
- host_list: List[Dict[str, Any]] = []
+ host_info_list: List[Dict[str, Any]] = []
host_id__instance_id_map: Dict[int, str] = {}
for instance_id, instance in instances.items():
bk_host_id = instance["host"].get("bk_host_id")
@@ -578,23 +578,25 @@ def handle_check_and_skip_instances(
# 忽略未同步主机
continue
host_obj = bk_host_id__host_map[bk_host_id]
- host_list.append({"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id})
+ host_info_list.append(
+ {"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id, "bk_agent_id": host_obj.bk_agent_id}
+ )
host_id__instance_id_map[bk_host_id] = instance_id
- # TODO 后续该功能覆盖范围广的情况下,可以考虑通过延时任务回写 DB 的进程状态信息,提高时效性
- proc_statues: List[Dict[str, Any]] = sync_proc_status_task.query_proc_status(
- proc_name=self.plugin_name, host_list=host_list
+ agent_id__readable_proc_status_map: Dict[str, Dict[str, Any]] = GseApiHelper.list_proc_state(
+ namespace=constants.GSE_NAMESPACE,
+ proc_name=self.plugin_name,
+ labels={"proc_name": self.plugin_name},
+ host_info_list=host_info_list,
+ extra_meta_data={},
)
- host_key__readable_proc_status_map: Dict[
- str, Dict[str, Any]
- ] = sync_proc_status_task.proc_statues2host_key__readable_proc_status_map(proc_statues)
- logger.info(f"host_key__readable_proc_status_map -> {host_key__readable_proc_status_map}")
+ logger.info(f"agent_id__readable_proc_status_map -> {agent_id__readable_proc_status_map}")
for bk_host_id, host_obj in bk_host_id__host_map.items():
+ agent_id: str = GseApiHelper.get_agent_id(host_obj)
instance_id: str = host_id__instance_id_map[bk_host_id]
- host_key: str = f"{host_obj.inner_ip}:{host_obj.bk_cloud_id}"
- proc_status: Optional[Dict[str, Any]] = host_key__readable_proc_status_map.get(host_key)
+ proc_status: Optional[Dict[str, Any]] = agent_id__readable_proc_status_map.get(agent_id)
if not proc_status:
# 查询不到进程状态信息视为插件状态异常
instance_actions[host_id__instance_id_map[bk_host_id]] = install_action
@@ -699,7 +701,7 @@ def _push_migrate_reason(_instance_id: str, **_extra_info):
bk_host_id__host_map: Dict[int, models.Host] = {
host.bk_host_id: host
for host in models.Host.objects.filter(bk_host_id__in=bk_host_ids).only(
- "bk_host_id", "inner_ip", "bk_cloud_id", "os_type", "cpu_arch"
+ "bk_host_id", "bk_agent_id", "inner_ip", "bk_cloud_id", "os_type", "cpu_arch"
)
}
self.handle_check_and_skip_instances(
diff --git a/apps/backend/tests/components/collections/agent_new/test_delegate_plugin_proc.py b/apps/backend/tests/components/collections/agent_new/test_delegate_plugin_proc.py
deleted file mode 100644
index 1b85317d3..000000000
--- a/apps/backend/tests/components/collections/agent_new/test_delegate_plugin_proc.py
+++ /dev/null
@@ -1,351 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
-Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
-Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
-You may obtain a copy of the License at https://opensource.org/licenses/MIT
-Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
-specific language governing permissions and limitations under the License.
-"""
-from collections import ChainMap
-from typing import Any, Dict, List, Optional
-
-import mock
-
-from apps.backend.api.constants import POLLING_INTERVAL, GseDataErrCode
-from apps.backend.components.collections.agent_new import components
-from apps.mock_data import api_mkd, common_unit
-from apps.mock_data import utils as mock_data_utils
-from apps.node_man import constants, models
-from apps.utils import basic
-from pipeline.component_framework.test import (
- ComponentTestCase,
- ExecuteAssertion,
- ScheduleAssertion,
-)
-
-from . import utils
-
-
-class DelegatePluginProcTestCase(utils.AgentServiceBaseTestCase):
-
- GSE_API_MOCK_PATHS: List[str] = ["apps.backend.components.collections.agent_new.delegate_plugin_proc.GseApi"]
-
- proc_name: Optional[str] = None
- operate_proc_result: Optional[Dict] = None
- update_proc_info_result: Optional[Dict] = None
- get_proc_operate_result: Optional[Dict] = None
- gse_api_mock_client: Optional[api_mkd.gse.utils.GseApiMockClient] = None
-
- @classmethod
- def get_gse_proc_key(cls, host_obj: models.Host, add_supplier_id: bool = False) -> str:
- suffix = f"{host_obj.inner_ip}:{constants.GSE_NAMESPACE}:{cls.proc_name}"
-
- if add_supplier_id:
- return f"{host_obj.bk_cloud_id}:{constants.DEFAULT_SUPPLIER_ID}:{suffix}"
- else:
- return f"{host_obj.bk_cloud_id}:{suffix}"
-
- @classmethod
- def structure_gse_mock_data(cls):
- """
- 构造GSE接口返回数据
- :return:
- """
-
- cls.update_proc_info_result = {
- cls.get_gse_proc_key(host_obj, add_supplier_id=True): {
- "content": "",
- "error_code": GseDataErrCode.SUCCESS,
- "error_msg": "success",
- }
- for host_obj in cls.obj_factory.host_objs
- }
- cls.get_proc_operate_result = {
- "result": True,
- "code": GseDataErrCode.SUCCESS,
- "message": "",
- "data": {
- cls.get_gse_proc_key(host_obj, add_supplier_id=False): {
- "content": "",
- "error_code": GseDataErrCode.SUCCESS,
- "error_msg": "success",
- }
- for host_obj in cls.obj_factory.host_objs
- },
- }
- cls.operate_proc_result = api_mkd.gse.unit.OP_RESULT
-
- @classmethod
- def adjust_test_data_in_db(cls):
-
- cls.proc_name = common_unit.plugin.PLUGIN_NAME
- os_types = {host_obj.os_type for host_obj in cls.obj_factory.host_objs}
- for os_type in os_types:
- package_data = basic.remove_keys_from_dict(origin_data=common_unit.plugin.PACKAGES_MODEL_DATA, keys=["id"])
- package_data.update({"cpu_arch": constants.CpuType.x86_64, "os": os_type.lower()})
-
- package_obj = models.Packages(**package_data)
- package_obj.save()
-
- proc_control_data = basic.remove_keys_from_dict(
- origin_data=common_unit.plugin.PROC_CONTROL_MODEL_DATA, keys=["id", "plugin_package_id"]
- )
- proc_control_data.update({"plugin_package_id": package_obj.id, "os": package_obj.os})
- proc_control_obj = models.ProcControl(**proc_control_data)
- proc_control_obj.save()
-
- @classmethod
- def setUpTestData(cls):
- super().setUpTestData()
- # 初始化DB数据后再修改
- cls.adjust_test_data_in_db()
- cls.structure_gse_mock_data()
-
- @classmethod
- def get_default_case_name(cls) -> str:
- return "托管插件进程成功"
-
- def fetch_succeeded_sub_inst_ids(self) -> List[int]:
- return self.common_inputs["subscription_instance_ids"]
-
- def component_cls(self):
- return components.DelegatePluginProcComponent
-
- def init_mock_clients(self):
- self.gse_api_mock_client = api_mkd.gse.utils.GseApiMockClient(
- update_proc_info_return=mock_data_utils.MockReturn(
- return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, return_obj=self.update_proc_info_result
- ),
- get_proc_operate_result_return=mock_data_utils.MockReturn(
- return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, return_obj=self.get_proc_operate_result
- ),
- operate_proc_return=mock_data_utils.MockReturn(
- return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, return_obj=self.operate_proc_result
- ),
- )
-
- def structure_common_inputs(self) -> Dict[str, Any]:
- common_inputs = super().structure_common_inputs()
- return {**common_inputs, "plugin_name": common_unit.plugin.PLUGIN_NAME}
-
- def structure_common_outputs(self) -> Dict[str, Any]:
- return {
- "polling_time": 0,
- "proc_name": self.proc_name,
- "task_id": self.operate_proc_result["task_id"],
- "skipped_host_ids": set(),
- "succeeded_subscription_instance_ids": self.fetch_succeeded_sub_inst_ids(),
- }
-
- def setUp(self) -> None:
- self.init_mock_clients()
- for gse_api_mock_path in self.GSE_API_MOCK_PATHS:
- mock.patch(gse_api_mock_path, self.gse_api_mock_client).start()
- super().setUp()
-
- def cases(self):
- return [
- ComponentTestCase(
- name=self.get_default_case_name(),
- inputs=self.common_inputs,
- parent_data={},
- execute_assertion=ExecuteAssertion(
- success=bool(self.fetch_succeeded_sub_inst_ids()), outputs=self.structure_common_outputs()
- ),
- schedule_assertion=[
- ScheduleAssertion(success=True, schedule_finished=True, outputs=self.structure_common_outputs()),
- ],
- execute_call_assertion=None,
- )
- ]
-
-
-class NotSupportOsTestCase(DelegatePluginProcTestCase):
-
- REMOVE_PACKAGES = True
-
- @classmethod
- def adjust_test_data_in_db(cls):
- super().adjust_test_data_in_db()
- if cls.REMOVE_PACKAGES:
- models.Packages.objects.filter(project=common_unit.plugin.PLUGIN_NAME).delete()
-
- @classmethod
- def get_default_case_name(cls) -> str:
- return "操作系统不支持"
-
- def cases(self):
- return [
- ComponentTestCase(
- name=self.get_default_case_name(),
- inputs=self.common_inputs,
- parent_data={},
- execute_assertion=ExecuteAssertion(
- success=bool(self.fetch_succeeded_sub_inst_ids()),
- # 操作系统不支持时,task_id 等数据还未写入outputs
- outputs={"succeeded_subscription_instance_ids": self.fetch_succeeded_sub_inst_ids()},
- ),
- schedule_assertion=[],
- execute_call_assertion=None,
- )
- ]
-
-
-class UpdateResultNotFoundTestCase(NotSupportOsTestCase):
-
- REMOVE_PACKAGES = False
-
- @classmethod
- def get_default_case_name(cls) -> str:
- return "未能查询到进程更新结果"
-
- def fetch_succeeded_sub_inst_ids(self) -> List[int]:
- return []
-
- @classmethod
- def structure_gse_mock_data(cls):
- """
- 构造GSE接口返回数据
- :return:
- """
- super().structure_gse_mock_data()
- # 更新插件进程信息接口置空
- cls.update_proc_info_result = {}
-
- def cases(self):
- return [
- ComponentTestCase(
- name=self.get_default_case_name(),
- inputs=self.common_inputs,
- parent_data={},
- execute_assertion=ExecuteAssertion(
- success=bool(self.fetch_succeeded_sub_inst_ids()),
- outputs={"succeeded_subscription_instance_ids": self.fetch_succeeded_sub_inst_ids()},
- ),
- schedule_assertion=None,
- execute_call_assertion=None,
- )
- ]
-
-
-class OpProcResultNotFoundTestCase(DelegatePluginProcTestCase):
- @classmethod
- def get_default_case_name(cls) -> str:
- return "未能查询到进程操作结果"
-
- @classmethod
- def structure_gse_mock_data(cls):
- """
- 构造GSE接口返回数据
- :return:
- """
- super().structure_gse_mock_data()
- # 更新插件进程信息接口置空
- cls.get_proc_operate_result["data"] = {}
-
- def cases(self):
- return [
- ComponentTestCase(
- name=self.get_default_case_name(),
- inputs=self.common_inputs,
- parent_data={},
- execute_assertion=ExecuteAssertion(
- success=bool(self.fetch_succeeded_sub_inst_ids()), outputs=self.structure_common_outputs()
- ),
- schedule_assertion=[
- ScheduleAssertion(
- success=False,
- schedule_finished=True,
- outputs=dict(
- ChainMap({"succeeded_subscription_instance_ids": []}, self.structure_common_outputs())
- ),
- ),
- ],
- execute_call_assertion=None,
- )
- ]
-
-
-class OpProcResultErrorTestCase(OpProcResultNotFoundTestCase):
- @classmethod
- def get_default_case_name(cls) -> str:
- return "GSE进程操作错误"
-
- @classmethod
- def structure_gse_mock_data(cls):
- """
- 构造GSE接口返回数据
- :return:
- """
- super().structure_gse_mock_data()
- # 更新插件进程信息接口置空
- cls.get_proc_operate_result["data"] = {
- cls.get_gse_proc_key(host_obj, add_supplier_id=False): {
- "content": "",
- "error_code": GseDataErrCode.AGENT_ABNORMAL,
- "error_msg": f"can not find connection by ip {host_obj.inner_ip}",
- }
- for host_obj in cls.obj_factory.host_objs
- }
-
-
-class TimeOutTestCase(DelegatePluginProcTestCase):
- @classmethod
- def get_default_case_name(cls) -> str:
- return "GSE操作结果查询超时"
-
- @classmethod
- def structure_gse_mock_data(cls):
- """
- 构造GSE接口返回数据
- :return:
- """
- super().structure_gse_mock_data()
- # 更新插件进程信息接口置空
- cls.get_proc_operate_result["data"] = {
- cls.get_gse_proc_key(host_obj, add_supplier_id=False): {
- "content": "",
- "error_code": GseDataErrCode.RUNNING,
- "error_msg": "running",
- }
- for host_obj in cls.obj_factory.host_objs
- }
-
- def setUp(self) -> None:
- super().setUp()
- mock.patch(
- "apps.backend.components.collections.agent_new.delegate_plugin_proc.POLLING_TIMEOUT",
- 2 * POLLING_INTERVAL - 1,
- ).start()
-
- def cases(self):
- return [
- ComponentTestCase(
- name=self.get_default_case_name(),
- inputs=self.common_inputs,
- parent_data={},
- execute_assertion=ExecuteAssertion(
- success=bool(self.fetch_succeeded_sub_inst_ids()), outputs=self.structure_common_outputs()
- ),
- schedule_assertion=[
- ScheduleAssertion(
- success=True,
- schedule_finished=False,
- outputs=dict(ChainMap({"polling_time": POLLING_INTERVAL}, self.structure_common_outputs())),
- ),
- ScheduleAssertion(
- success=False,
- schedule_finished=False,
- outputs=dict(
- ChainMap(
- {"succeeded_subscription_instance_ids": [], "polling_time": POLLING_INTERVAL * 2},
- self.structure_common_outputs(),
- )
- ),
- ),
- ],
- execute_call_assertion=None,
- )
- ]
diff --git a/apps/backend/tests/components/collections/agent_new/test_get_agent_status.py b/apps/backend/tests/components/collections/agent_new/test_get_agent_status.py
index 645562b7f..33291f9d1 100644
--- a/apps/backend/tests/components/collections/agent_new/test_get_agent_status.py
+++ b/apps/backend/tests/components/collections/agent_new/test_get_agent_status.py
@@ -13,7 +13,9 @@
from typing import Any, Callable, Dict, List, Optional
import mock
+from django.conf import settings
+from apps.adapters.api import gse
from apps.backend.api.constants import POLLING_INTERVAL
from apps.backend.components.collections.agent_new import components
from apps.mock_data import api_mkd, common_unit
@@ -30,7 +32,7 @@
class GetAgentStatusTestCaseMixin:
EXPECT_STATUS = constants.ProcStateType.RUNNING
- GSE_API_MOCK_PATHS: List[str] = ["apps.backend.components.collections.agent_new.get_agent_status.GseApi"]
+ GSE_API_MOCK_PATHS: List[str] = ["apps.backend.components.collections.agent_new.get_agent_status.GseApiHelper"]
host_ids_need_to_next_query: List[int] = None
get_agent_info_func: Callable[[Dict], Dict] = None
@@ -87,7 +89,12 @@ def component_cls(self):
def setUp(self) -> None:
self.init_mock_clients()
for gse_api_mock_path in self.GSE_API_MOCK_PATHS:
- mock.patch(gse_api_mock_path, self.gse_api_mock_client).start()
+ mock.patch(
+ gse_api_mock_path,
+ gse.get_gse_api_helper(settings.GSE_VERSION)(
+ version=settings.GSE_VERSION, gse_api_obj=self.gse_api_mock_client
+ ),
+ ).start()
super().setUp()
diff --git a/apps/backend/tests/components/collections/plugin/test_gse_operate_proc.py b/apps/backend/tests/components/collections/plugin/test_gse_operate_proc.py
index d95344643..2153fc624 100644
--- a/apps/backend/tests/components/collections/plugin/test_gse_operate_proc.py
+++ b/apps/backend/tests/components/collections/plugin/test_gse_operate_proc.py
@@ -10,8 +10,10 @@
"""
from unittest.mock import patch
+from django.conf import settings
from django.test import TestCase
+from apps.adapters.api import gse
from apps.backend.components.collections.plugin import (
GseOperateProcComponent,
GseOperateProcService,
@@ -45,7 +47,10 @@ def setUp(self):
self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client)
self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient)
self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client)
- self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient)
+ self.plugin_gseapi = patch(
+ utils.PLUGIN_GSEAPI,
+ gse.get_gse_api_helper(settings.GSE_VERSION)(settings.GSE_VERSION, utils.GseMockClient()),
+ )
self.plugin_gseapi.start()
self.cmdb_client.start()
diff --git a/apps/backend/tests/components/collections/plugin/test_job_allocate_port.py b/apps/backend/tests/components/collections/plugin/test_job_allocate_port.py
index 81505aeeb..5b3d4549b 100644
--- a/apps/backend/tests/components/collections/plugin/test_job_allocate_port.py
+++ b/apps/backend/tests/components/collections/plugin/test_job_allocate_port.py
@@ -40,7 +40,6 @@ def setUp(self):
self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client)
self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient)
self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client)
- self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient)
self.cmdb_client.start()
self.plugin_client.start()
diff --git a/apps/backend/tests/components/collections/plugin/test_remove_config.py b/apps/backend/tests/components/collections/plugin/test_remove_config.py
index 5a7b0993f..013850eca 100644
--- a/apps/backend/tests/components/collections/plugin/test_remove_config.py
+++ b/apps/backend/tests/components/collections/plugin/test_remove_config.py
@@ -40,7 +40,6 @@ def setUp(self):
self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client)
self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient)
self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client)
- self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient)
self.cmdb_client.start()
self.plugin_client.start()
diff --git a/apps/backend/tests/components/collections/plugin/test_render_and_push_config.py b/apps/backend/tests/components/collections/plugin/test_render_and_push_config.py
index a87f1de4f..ef4c9465e 100644
--- a/apps/backend/tests/components/collections/plugin/test_render_and_push_config.py
+++ b/apps/backend/tests/components/collections/plugin/test_render_and_push_config.py
@@ -40,7 +40,6 @@ def setUp(self):
self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client)
self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient)
self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client)
- self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient)
self.cmdb_client.start()
self.plugin_client.start()
diff --git a/apps/backend/tests/components/collections/plugin/test_transfer_package.py b/apps/backend/tests/components/collections/plugin/test_transfer_package.py
index 774719135..434a37644 100644
--- a/apps/backend/tests/components/collections/plugin/test_transfer_package.py
+++ b/apps/backend/tests/components/collections/plugin/test_transfer_package.py
@@ -37,7 +37,6 @@ def setUp(self):
patch(utils.CMDB_CLIENT_MOCK_PATH, utils.CmdbClient).start()
patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client).start()
patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client).start()
- patch(utils.PLUGIN_GSEAPI, utils.GseMockClient).start()
patch(utils.JOB_JOBAPI, utils.JobMockClient).start()
patch(utils.PLUGIN_CLIENT_MOCK_PATH, utils.JobMockClient).start()
diff --git a/apps/backend/tests/components/collections/plugin/test_uninstall_package.py b/apps/backend/tests/components/collections/plugin/test_uninstall_package.py
index 48ecfbb1a..99f58f9f1 100644
--- a/apps/backend/tests/components/collections/plugin/test_uninstall_package.py
+++ b/apps/backend/tests/components/collections/plugin/test_uninstall_package.py
@@ -40,7 +40,6 @@ def setUp(self):
self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client)
self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient)
self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client)
- self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient)
self.cmdb_client.start()
self.plugin_client.start()
diff --git a/apps/backend/tests/components/collections/plugin/utils.py b/apps/backend/tests/components/collections/plugin/utils.py
index d925c3377..be1843290 100644
--- a/apps/backend/tests/components/collections/plugin/utils.py
+++ b/apps/backend/tests/components/collections/plugin/utils.py
@@ -80,7 +80,7 @@
JOB_MULTI_THREAD_PATH = "apps.backend.components.collections.job.request_multi_thread"
-PLUGIN_GSEAPI = "apps.backend.components.collections.plugin.GseApi"
+PLUGIN_GSEAPI = "apps.backend.components.collections.plugin.GseApiHelper"
# 目标主机信息
INSTANCE_INFO = {
"key": "",
diff --git a/apps/mock_data/api_mkd/gse/unit.py b/apps/mock_data/api_mkd/gse/unit.py
index e7f434f50..813d9f519 100644
--- a/apps/mock_data/api_mkd/gse/unit.py
+++ b/apps/mock_data/api_mkd/gse/unit.py
@@ -56,25 +56,23 @@
}
}
-GET_AGENT_STATE_LIST_DATA = [
- {
- "bk_agent_id": f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}",
- "bk_cloud_id": constants.DEFAULT_CLOUD,
- "version": GSE_PROCESS_VERSION,
- "run_mode": 0,
- "status_code": constants.GseAgentStatusCode.RUNNING.value,
- }
-]
+GSE_V2_AGENT_STATE_DATA = {
+ "bk_agent_id": f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}",
+ "bk_cloud_id": constants.DEFAULT_CLOUD,
+ "version": GSE_PROCESS_VERSION,
+ "run_mode": 0,
+ "status_code": constants.GseAgentStatusCode.RUNNING.value,
+}
-GET_AGENT_NOT_ALIVE_STATE_LIST_DATA = [
- {
- "bk_agent_id": f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}",
- "bk_cloud_id": constants.DEFAULT_CLOUD,
- "version": GSE_PROCESS_VERSION,
- "run_mode": 0,
- "status_code": constants.GseAgentStatusCode.STOPPED.value,
- }
-]
+GSE_V2_AGENT_NOT_ALIVE_STATE_DATA = {
+ **GSE_V2_AGENT_STATE_DATA,
+ "status_code": constants.GseAgentStatusCode.STOPPED.value,
+}
+
+
+GET_V2_AGENT_STATE_DATA_LIST = [GSE_V2_AGENT_STATE_DATA]
+
+GET_V2_AGENT_NOT_ALIVE_STATE_LIST = [GSE_V2_AGENT_NOT_ALIVE_STATE_DATA]
GET_AGENT_INFO_LIST_DATA = [
{
@@ -124,24 +122,26 @@ def mock_get_agent_info_list(params):
return agent_info_list
-def mock_get_agent_state_list(params):
- agent_state_list = copy.deepcopy(GET_AGENT_STATE_LIST_DATA)
- for index, agent_state in enumerate(agent_state_list):
- agent_state["bk_agent_id"] = params["agent_id_list"][index]
+def mock_v2_cluster_list_agent_state_return(params):
+ agent_state_list = []
+ for agent_id in params["agent_id_list"]:
+ agent_state_list.append({**GSE_V2_AGENT_STATE_DATA, "bk_agent_id": agent_id})
return agent_state_list
def mock_get_proc_status(params):
pro_status_data = copy.deepcopy(GET_PROC_STATUS_DATA)
- hosts_param = params["hosts"][0]
- for index, host_info in enumerate(params["hosts"]):
- if f"{hosts_param['bk_cloud_id']}:{hosts_param['ip']}" == hosts_param["agent_id"]:
- pro_status_data["proc_infos"][index]["host"] = {
- "ip": hosts_param["ip"],
- "bk_cloud_id": hosts_param["bk_cloud_id"],
- }
- else:
- pro_status_data["proc_infos"][index]["bk_agent_id"] = hosts_param["agent_id"]
+ if "hosts" in params:
+ hosts = params["hosts"]
+ else:
+ hosts = []
+ for agent_id in params["agent_id_list"]:
+ bk_cloud_id, ip = agent_id.split(":")
+ hosts.append({"bk_cloud_id": bk_cloud_id, "ip": ip})
+
+ for index, host_info in enumerate(hosts):
+ pro_status_data["proc_infos"][index]["bk_agent_id"] = f"{host_info['bk_cloud_id']}:{host_info['ip']}"
+ pro_status_data["proc_infos"][index]["host"] = {"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]}
# 添加返回一个不存在的key来模拟额外返回的case
not_exist_proc_info = copy.deepcopy(pro_status_data["proc_infos"][0])
not_exist_proc_info["bk_agent_id"] = "not_exist_proc_info_agent_id"
diff --git a/apps/mock_data/api_mkd/gse/utils.py b/apps/mock_data/api_mkd/gse/utils.py
index 2f3b4ae89..49141e4ad 100644
--- a/apps/mock_data/api_mkd/gse/utils.py
+++ b/apps/mock_data/api_mkd/gse/utils.py
@@ -29,14 +29,11 @@ class GseApiMockClient(utils.BaseMockClient):
GET_AGENT_NOT_ALIVE_STATUS_RETURN = utils.MockReturn(
return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_AGENT_NOT_ALIVE_STATUS_DATA
)
- DEFAULT_GET_AGENT_INFO_LIST_RETURN = utils.MockReturn(
- return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_get_agent_info_list
- )
- DEFAULT_GET_AGENT_STATE_LIST_RETURN = utils.MockReturn(
- return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_get_agent_state_list
+ DEFAULT_V2_CLUSTER_LIST_AGENT_STATE_RETURN = utils.MockReturn(
+ return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_v2_cluster_list_agent_state_return
)
GET_AGENT_NOT_ALIVE_STATE_LIST_RETURN = utils.MockReturn(
- return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_AGENT_NOT_ALIVE_STATE_LIST_DATA
+ return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_V2_AGENT_NOT_ALIVE_STATE_LIST
)
DEFAULT_GET_PROC_STATUS_RETURN = utils.MockReturn(
return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_get_proc_status
@@ -52,8 +49,7 @@ def __init__(
update_proc_info_return=None,
get_agent_info_return=DEFAULT_GET_AGENT_INFO_RETURN,
get_agent_status_return=DEFAULT_GET_AGENT_STATUS_RETURN,
- get_agent_info_list_return=DEFAULT_GET_AGENT_INFO_LIST_RETURN,
- get_agent_state_list_return=DEFAULT_GET_AGENT_STATE_LIST_RETURN,
+ v2_cluster_list_agent_state_return=DEFAULT_V2_CLUSTER_LIST_AGENT_STATE_RETURN,
):
super().__init__()
self.operate_proc = self.generate_magic_mock(mock_return_obj=operate_proc_return)
@@ -64,5 +60,4 @@ def __init__(
self.update_proc_info = self.generate_magic_mock(mock_return_obj=update_proc_info_return)
self.get_agent_info = self.generate_magic_mock(mock_return_obj=get_agent_info_return)
self.get_agent_status = self.generate_magic_mock(mock_return_obj=get_agent_status_return)
- self.get_agent_info_list = self.generate_magic_mock(mock_return_obj=get_agent_info_list_return)
- self.get_agent_state_list = self.generate_magic_mock(mock_return_obj=get_agent_state_list_return)
+ self.v2_cluster_list_agent_state = self.generate_magic_mock(mock_return_obj=v2_cluster_list_agent_state_return)
diff --git a/apps/node_man/periodic_tasks/sync_agent_status_task.py b/apps/node_man/periodic_tasks/sync_agent_status_task.py
index 1aa2645ed..378bf0553 100644
--- a/apps/node_man/periodic_tasks/sync_agent_status_task.py
+++ b/apps/node_man/periodic_tasks/sync_agent_status_task.py
@@ -8,25 +8,20 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
+from typing import Dict
+
from celery.task import periodic_task, task
-from django.conf import settings
-from django.db.models import Q
+from apps.adapters.api.gse import GseApiHelper
from apps.node_man import constants
from apps.node_man.models import Host, ProcessStatus
from apps.utils.periodic_task import calculate_countdown
-from common.api import GseApi
from common.log import logger
@task(queue="default", ignore_result=True)
-def update_or_create_host_agent_status(task_id, start, end, has_agent_id=False):
- # GSE接口不支持同时传 {cloud_id}:{ip} + {agent_id} 混合模式,因此需要分开处理
- if has_agent_id:
- host_queryset = Host.objects.exclude(bk_agent_id__isnull=True).exclude(bk_agent_id__exact="")
- else:
- host_queryset = Host.objects.filter(Q(bk_agent_id__isnull=True) | Q(bk_agent_id__exact=""))
- hosts = host_queryset.values("bk_host_id", "bk_agent_id", "bk_cloud_id", "inner_ip", "node_from")[start:end]
+def update_or_create_host_agent_status(task_id, start, end):
+ hosts = Host.objects.values("bk_host_id", "bk_agent_id", "bk_cloud_id", "inner_ip", "node_from")[start:end]
if not hosts:
# 结束递归
return
@@ -38,35 +33,16 @@ def update_or_create_host_agent_status(task_id, start, end, has_agent_id=False):
node_from_map = {}
# 生成查询参数host弄表
- agent_id_list = []
- query_host = []
+ query_hosts = []
for host in hosts:
- if host["bk_agent_id"] and settings.GSE_VERSION != "V1":
- agent_id = host["bk_agent_id"]
- else:
- agent_id = f"{host['bk_cloud_id']}:{host['inner_ip']}"
+ agent_id = GseApiHelper.get_agent_id(host)
bk_host_id_map[agent_id] = host["bk_host_id"]
node_from_map[agent_id] = host["node_from"]
- query_host.append({"ip": host["inner_ip"], "bk_cloud_id": host["bk_cloud_id"]})
- agent_id_list.append(agent_id)
-
- # 查询agent状态和版本信息
- if settings.GSE_VERSION == "V1":
- agent_status_data = GseApi.get_agent_status({"hosts": query_host})
- agent_info_data = GseApi.get_agent_info({"hosts": query_host})
- else:
- # GSE2.0 接口,补充GSE1.0字段
- agent_status_list = GseApi.get_agent_state_list({"agent_id_list": agent_id_list})
- agent_status_data = {}
- for agent_status in agent_status_list:
- if agent_status.get("status_code") == constants.GseAgentStatusCode.RUNNING.value:
- agent_status["bk_agent_alive"] = constants.BkAgentStatus.ALIVE.value
- else:
- agent_status["bk_agent_alive"] = constants.BkAgentStatus.NOT_ALIVE.value
- agent_status_data[agent_status["bk_agent_id"]] = agent_status
+ query_hosts.append(
+ {"ip": host["inner_ip"], "bk_cloud_id": host["bk_cloud_id"], "bk_agent_id": host["bk_agent_id"]}
+ )
- agent_info_list = GseApi.get_agent_info_list({"agent_id_list": agent_id_list})
- agent_info_data = {agent_info["bk_agent_id"]: agent_info for agent_info in agent_info_list}
+ agent_id__agent_state_info_map: Dict[str, Dict] = GseApiHelper.list_agent_state(query_hosts)
# 查询需要更新主机的ProcessStatus对象
process_status_objs = ProcessStatus.objects.filter(
@@ -84,20 +60,20 @@ def update_or_create_host_agent_status(task_id, start, end, has_agent_id=False):
process_objs = []
need_update_node_from_host = []
to_be_created_status = []
- for key, host_info in agent_status_data.items():
- process_status_id = process_status_id_map.get(bk_host_id_map[key], {}).get("id")
- is_running = host_info["bk_agent_alive"] == constants.BkAgentStatus.ALIVE.value
- version = constants.VERSION_PATTERN.search(agent_info_data[key]["version"])
+ for agent_id, agent_state_info in agent_id__agent_state_info_map.items():
+ process_status_id = process_status_id_map.get(bk_host_id_map[agent_id], {}).get("id")
+ is_running = agent_state_info["bk_agent_alive"] == constants.BkAgentStatus.ALIVE.value
+ version = agent_state_info["version"]
if is_running:
status = constants.ProcStateType.RUNNING
- if node_from_map[key] == constants.NodeFrom.CMDB:
+ if node_from_map[agent_id] == constants.NodeFrom.CMDB:
need_update_node_from_host.append(
- Host(bk_host_id=bk_host_id_map[key], node_from=constants.NodeFrom.NODE_MAN)
+ Host(bk_host_id=bk_host_id_map[agent_id], node_from=constants.NodeFrom.NODE_MAN)
)
else:
# 状态为0时如果节点管理为CMDB标记为未安装否则为异常
- if node_from_map[key] == constants.NodeFrom.CMDB:
+ if node_from_map[agent_id] == constants.NodeFrom.CMDB:
# NOT_INSTALLED
status = constants.ProcStateType.NOT_INSTALLED
else:
@@ -107,17 +83,11 @@ def update_or_create_host_agent_status(task_id, start, end, has_agent_id=False):
if not process_status_id:
# 如果不存在ProcessStatus对象需要创建
to_be_created_status.append(
- ProcessStatus(
- bk_host_id=bk_host_id_map[key],
- status=status,
- version=version.group() if version else "",
- )
+ ProcessStatus(bk_host_id=bk_host_id_map[agent_id], status=status, version=version)
)
else:
process_objs.append(
- ProcessStatus(
- id=process_status_id, status=status, version=version.group() if (version and is_running) else ""
- )
+ ProcessStatus(id=process_status_id, status=status, version=(version, "")[not is_running])
)
# 批量更新状态&版本
@@ -139,18 +109,14 @@ def sync_agent_status_periodic_task():
"""
task_id = sync_agent_status_periodic_task.request.id
logger.info(f"{task_id} | sync_agent_status_task: Start syncing host status.")
- for (has_agent_id, queryset) in [
- (True, Host.objects.exclude(bk_agent_id__isnull=True).exclude(bk_agent_id__exact="")),
- (False, Host.objects.filter(Q(bk_agent_id__isnull=True) | Q(bk_agent_id__exact=""))),
- ]:
- count = queryset.count()
- for start in range(0, count, constants.QUERY_AGENT_STATUS_HOST_LENS):
- countdown = calculate_countdown(
- count=count / constants.QUERY_AGENT_STATUS_HOST_LENS,
- index=start / constants.QUERY_AGENT_STATUS_HOST_LENS,
- duration=constants.SYNC_AGENT_STATUS_TASK_INTERVAL,
- )
- logger.info(f"{task_id} | sync_agent_status_task after {countdown} seconds")
- update_or_create_host_agent_status.apply_async(
- (task_id, start, start + constants.QUERY_AGENT_STATUS_HOST_LENS, has_agent_id), countdown=countdown
- )
+ count = Host.objects.count()
+ for start in range(0, count, constants.QUERY_AGENT_STATUS_HOST_LENS):
+ countdown = calculate_countdown(
+ count=count / constants.QUERY_AGENT_STATUS_HOST_LENS,
+ index=start / constants.QUERY_AGENT_STATUS_HOST_LENS,
+ duration=constants.SYNC_AGENT_STATUS_TASK_INTERVAL,
+ )
+ logger.info(f"{task_id} | sync_agent_status_task after {countdown} seconds")
+ update_or_create_host_agent_status.apply_async(
+ (task_id, start, start + constants.QUERY_AGENT_STATUS_HOST_LENS), countdown=countdown
+ )
diff --git a/apps/node_man/periodic_tasks/sync_cmdb_host.py b/apps/node_man/periodic_tasks/sync_cmdb_host.py
index dcc41aba3..f77e9ea68 100644
--- a/apps/node_man/periodic_tasks/sync_cmdb_host.py
+++ b/apps/node_man/periodic_tasks/sync_cmdb_host.py
@@ -32,7 +32,7 @@ def query_biz_hosts(bk_biz_id: int, bk_host_ids: typing.List[int]) -> typing.Lis
:return: 主机列表
"""
query_params = {
- "fields": constants.LIST_BIZ_HOSTS_KWARGS,
+ "fields": constants.CC_HOST_FIELDS,
"host_property_filter": {
"condition": "AND",
"rules": [{"field": "bk_host_id", "operator": "in", "value": bk_host_ids}],
@@ -158,9 +158,9 @@ def update_or_create_host_base(biz_id, task_id, cmdb_host_data):
# 查询节点管理已存在的主机
exist_proxy_host_ids: typing.Set[int] = set(
- models.Host.objects.filter(
- bk_host_id__in=bk_host_ids, node_type=constants.NodeType.PROXY
- ).values_list("bk_host_id", flat=True)
+ models.Host.objects.filter(bk_host_id__in=bk_host_ids, node_type=constants.NodeType.PROXY).values_list(
+ "bk_host_id", flat=True
+ )
)
exist_agent_host_ids: typing.Set[int] = set(
models.Host.objects.filter(bk_host_id__in=bk_host_ids)
diff --git a/apps/node_man/periodic_tasks/sync_proc_status_task.py b/apps/node_man/periodic_tasks/sync_proc_status_task.py
index ff8adc585..5b1c42623 100644
--- a/apps/node_man/periodic_tasks/sync_proc_status_task.py
+++ b/apps/node_man/periodic_tasks/sync_proc_status_task.py
@@ -12,110 +12,60 @@
import typing
from celery.task import periodic_task, task
-from django.db.models import Q
-from apps.core.concurrent import controller
+from apps.adapters.api.gse import GseApiHelper
from apps.node_man import constants, tools
from apps.node_man.models import Host, ProcessStatus
-from apps.utils import concurrent
from apps.utils.periodic_task import calculate_countdown
-from common.api import GseApi
from common.log import logger
-def get_version(version_str):
- version = constants.VERSION_PATTERN.search(version_str)
- return version.group() if version else ""
-
-
-@controller.ConcurrentController(
- data_list_name="host_list",
- batch_call_func=concurrent.batch_call,
- get_config_dict_func=lambda: {"limit": constants.QUERY_PROC_STATUS_HOST_LENS},
-)
-def query_proc_status(proc_name, host_list):
- """
- 上云接口测试
- 200: 0.3s-0.4s (0.41s 0.29s 0.33s)
- 500: 0.35s-0.5s (0.34s 0.42s 0.51s)
- 1000: 0.5s-0.6s (0.53s 0.56s 0.61s)
- 2000: 0.9s (0.91s, 0.93s)
- 5000: 2s-4s (2.3s 2.2s 4.1s 4.3s)
- """
- kwargs = {
- "meta": {"namespace": constants.GSE_NAMESPACE, "name": proc_name, "labels": {"proc_name": proc_name}},
- "hosts": host_list,
- "agent_id_list": [host["agent_id"] for host in host_list],
- }
- data = GseApi.get_proc_status(kwargs)
-
- return data.get("proc_infos") or []
-
-
-def proc_statues2host_key__readable_proc_status_map(
- proc_statuses: typing.List[typing.Dict[str, typing.Any]]
-) -> typing.Dict[str, typing.Dict[str, typing.Any]]:
- """
- 将原始的进程状态信息格式化为 主机唯一标识 - 可读的进程状态信息 映射关系
- :param proc_statuses: 进程状态信息
- :return: 主机唯一标识 - 可读的进程状态信息 映射关系
- """
- host_key__readable_proc_status_map: typing.Dict[str, typing.Dict[str, typing.Any]] = {}
- for proc_status in proc_statuses:
- if "bk_agent_id" in proc_status:
- host_key = proc_status["bk_agent_id"]
- else:
- host_key = f"{proc_status['host']['bk_cloud_id']}:{proc_status['host']['ip']}"
- host_key__readable_proc_status_map[host_key] = {
- "version": get_version(proc_status["version"]),
- "status": constants.PLUGIN_STATUS_DICT[proc_status["status"]],
- "is_auto": constants.AutoStateType.AUTO if proc_status["isauto"] else constants.AutoStateType.UNAUTO,
- "name": proc_status["meta"]["name"],
- }
- return host_key__readable_proc_status_map
-
-
@task(queue="default", ignore_result=True)
-def update_or_create_proc_status(task_id, hosts, sync_proc_list, start):
- host_ip_cloud_list = []
- bk_host_id_map = {}
- for host in hosts:
- if host.bk_agent_id:
- agent_id = host.bk_agent_id
- else:
- agent_id = f"{host.bk_cloud_id}:{host.inner_ip}"
- host_ip_cloud_list.append({"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id, "agent_id": agent_id})
- bk_host_id_map[agent_id] = host.bk_host_id
+def update_or_create_proc_status(task_id, host_objs, sync_proc_list, start):
+ host_info_list = []
+ agent_id__host_id_map = {}
+ for host_obj in host_objs:
+ host_info = {"ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id, "bk_agent_id": host_obj.bk_agent_id}
+ host_info_list.append(host_info)
+ agent_id__host_id_map[GseApiHelper.get_agent_id(host_info)] = host_obj.bk_host_id
for proc_name in sync_proc_list:
past = time.time()
- proc_statuses = query_proc_status(proc_name=proc_name, host_list=host_ip_cloud_list)
- logger.info(f"this get_proc_status cost {time.time() - past}s")
- host_key__readable_proc_status_map: typing.Dict[
+ agent_id__readable_proc_status_map: typing.Dict[
str, typing.Dict[str, typing.Any]
- ] = proc_statues2host_key__readable_proc_status_map(proc_statuses)
+ ] = GseApiHelper.list_proc_state(
+ namespace=constants.GSE_NAMESPACE,
+ proc_name=proc_name,
+ labels={"proc_name": proc_name},
+ host_info_list=host_info_list,
+ extra_meta_data={},
+ )
+
+ logger.info(f"this get_proc_status cost {time.time() - past}s")
process_status_objs = ProcessStatus.objects.filter(
name=proc_name,
- bk_host_id__in=bk_host_id_map.values(),
+ bk_host_id__in=agent_id__host_id_map.values(),
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
is_latest=True,
).values("bk_host_id", "id", "name", "status")
host_proc_key__proc_status_map = {}
- for item in process_status_objs:
- host_proc_key__proc_status_map[f"{item['name']}:{item['bk_host_id']}"] = item
+ for process_status_obj in process_status_objs:
+ host_proc_key__proc_status_map[
+ f"{process_status_obj['name']}:{process_status_obj['bk_host_id']}"
+ ] = process_status_obj
need_update_status = []
need_create_status = []
- for host_key, readable_proc_status in host_key__readable_proc_status_map.items():
- if host_key not in bk_host_id_map:
+ for agent_id, readable_proc_status in agent_id__readable_proc_status_map.items():
+ if agent_id not in agent_id__host_id_map:
continue
db_proc_status = host_proc_key__proc_status_map.get(
- f'{readable_proc_status["name"]}:{bk_host_id_map[host_key]}'
+ f'{readable_proc_status["name"]}:{agent_id__host_id_map[agent_id]}'
)
# 如果DB中进程状态为手动停止,并且同步回来的进程状态为终止,此时保持手动停止的标记,用于订阅的豁免操作
@@ -144,7 +94,7 @@ def update_or_create_proc_status(task_id, hosts, sync_proc_list, start):
name=readable_proc_status["name"],
source_type=ProcessStatus.SourceType.DEFAULT,
proc_type=constants.ProcType.PLUGIN,
- bk_host_id=bk_host_id_map[host_key],
+ bk_host_id=agent_id__host_id_map[agent_id],
is_latest=True,
)
# 忽略无用的进程信息
@@ -164,26 +114,22 @@ def update_or_create_proc_status(task_id, hosts, sync_proc_list, start):
def sync_proc_status_periodic_task():
sync_proc_list = tools.PluginV2Tools.fetch_head_plugins()
task_id = sync_proc_status_periodic_task.request.id
- host_queryset_list = [
- Host.objects.exclude(bk_agent_id__isnull=True).exclude(bk_agent_id__exact=""),
- Host.objects.filter(Q(bk_agent_id__isnull=True) | Q(bk_agent_id__exact="")),
- ]
- for host_queryset in host_queryset_list:
- count = host_queryset.count()
- logger.info(f"{task_id} | sync host proc status... host_count={count}.")
-
- for start in range(0, count, constants.QUERY_PROC_STATUS_HOST_LENS):
- countdown = calculate_countdown(
- count=count / constants.QUERY_PROC_STATUS_HOST_LENS,
- index=start / constants.QUERY_PROC_STATUS_HOST_LENS,
- duration=constants.SYNC_PROC_STATUS_TASK_INTERVAL,
- )
- logger.info(f"{task_id} | sync host proc status after {countdown} seconds")
-
- # (task_id, hosts[start: start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start)
- update_or_create_proc_status.apply_async(
- (task_id, host_queryset[start : start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start),
- countdown=countdown,
- )
-
- logger.info(f"{task_id} | sync host proc status complete.")
+ host_queryset = Host.objects.all()
+ count = host_queryset.count()
+ logger.info(f"{task_id} | sync host proc status... host_count={count}.")
+
+ for start in range(0, count, constants.QUERY_PROC_STATUS_HOST_LENS):
+ countdown = calculate_countdown(
+ count=count / constants.QUERY_PROC_STATUS_HOST_LENS,
+ index=start / constants.QUERY_PROC_STATUS_HOST_LENS,
+ duration=constants.SYNC_PROC_STATUS_TASK_INTERVAL,
+ )
+ logger.info(f"{task_id} | sync host proc status after {countdown} seconds")
+
+ # (task_id, hosts[start: start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start)
+ update_or_create_proc_status.apply_async(
+ (task_id, host_queryset[start : start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start),
+ countdown=countdown,
+ )
+
+ logger.info(f"{task_id} | sync host proc status complete.")
diff --git a/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py b/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py
index 939b01850..09db7f2c9 100644
--- a/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py
+++ b/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py
@@ -10,8 +10,10 @@
"""
from unittest.mock import patch
+from django.conf import settings
from django.test import override_settings
+from apps.adapters.api import gse
from apps.mock_data.api_mkd.gse.unit import GSE_PROCESS_VERSION
from apps.mock_data.api_mkd.gse.utils import GseApiMockClient
from apps.mock_data.common_unit.host import (
@@ -25,6 +27,7 @@
update_or_create_host_agent_status,
)
from apps.utils.unittest.testcase import CustomBaseTestCase
+from env.constants import GseVersion
class TestSyncAgentStatus(CustomBaseTestCase):
@@ -36,7 +39,10 @@ def test_update_or_create_host_agent_status_no_host(self):
update_or_create_host_agent_status(None, 0, 1)
self.assertEqual(ProcessStatus.objects.count(), 0)
- @patch("apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", GseApiMockClient())
+ @patch(
+ "apps.node_man.periodic_tasks.sync_agent_status_task.GseApiHelper",
+ gse.get_gse_api_helper(settings.GSE_VERSION)(settings.GSE_VERSION, GseApiMockClient()),
+ )
def test_update_or_create_host_agent_status_alive(self):
host = Host.objects.create(**HOST_MODEL_DATA)
# 测试创建ProcessStatus对象
@@ -48,8 +54,11 @@ def test_update_or_create_host_agent_status_alive(self):
self.assertEqual(host.node_from, constants.NodeFrom.NODE_MAN)
@patch(
- "apps.node_man.periodic_tasks.sync_agent_status_task.GseApi",
- GseApiMockClient(get_agent_status_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATUS_RETURN),
+ "apps.node_man.periodic_tasks.sync_agent_status_task.GseApiHelper",
+ gse.get_gse_api_helper(settings.GSE_VERSION)(
+ settings.GSE_VERSION,
+ GseApiMockClient(get_agent_status_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATUS_RETURN),
+ ),
)
def test_update_or_create_host_agent_status_not_alive(self):
host = Host.objects.create(**HOST_MODEL_DATA)
@@ -65,19 +74,25 @@ def test_update_or_create_host_agent_status_not_alive(self):
process_status = ProcessStatus.objects.get(bk_host_id=host.bk_host_id)
self.assertEqual(process_status.status, constants.ProcStateType.TERMINATED)
- @override_settings(GSE_VERSION="V2")
- @patch("apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", GseApiMockClient())
+ @override_settings(GSE_VERSION=GseVersion.V2.value)
+ @patch(
+ "apps.node_man.periodic_tasks.sync_agent_status_task.GseApiHelper",
+ gse.get_gse_api_helper(GseVersion.V2.value)(GseVersion.V2.value, GseApiMockClient()),
+ )
def test_update_or_create_host_agent_status_alive_gse_v2(self):
host = Host.objects.create(**HOST_MODEL_DATA_WITH_AGENT_ID)
# 测试创建ProcessStatus对象
- update_or_create_host_agent_status(None, 0, 1, has_agent_id=True)
+ update_or_create_host_agent_status(None, 0, 1)
process_status = ProcessStatus.objects.get(bk_host_id=host.bk_host_id)
self.assertEqual(process_status.status, constants.ProcStateType.RUNNING)
- @override_settings(GSE_VERSION="V2")
+ @override_settings(GSE_VERSION=GseVersion.V2.value)
@patch(
- "apps.node_man.periodic_tasks.sync_agent_status_task.GseApi",
- GseApiMockClient(get_agent_state_list_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATE_LIST_RETURN),
+ "apps.node_man.periodic_tasks.sync_agent_status_task.GseApiHelper",
+ gse.get_gse_api_helper(GseVersion.V2.value)(
+ GseVersion.V2.value,
+ GseApiMockClient(v2_cluster_list_agent_state_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATE_LIST_RETURN),
+ ),
)
def test_update_or_create_host_agent_status_not_alive_gse_v2(self):
host = Host.objects.create(**HOST_MODEL_DATA)
diff --git a/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py b/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py
index 5fcde4ee4..92f4ac88d 100644
--- a/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py
+++ b/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py
@@ -12,7 +12,9 @@
import importlib
import mock
+from django.conf import settings
+from apps.adapters.api import gse
from apps.mock_data.api_mkd.gse.unit import GSE_PROCESS_NAME
from apps.mock_data.api_mkd.gse.utils import GseApiMockClient
from apps.mock_data.common_unit.host import (
@@ -27,7 +29,6 @@
class TestSyncProcStatus(CustomBaseTestCase):
-
@classmethod
def setUpClass(cls):
mock.patch("apps.utils.concurrent.batch_call", concurrent.batch_call_serial).start()
@@ -41,7 +42,10 @@ def test_sync_proc_status_periodic_task(self):
Host.objects.create(**HOST_MODEL_DATA)
sync_proc_status_task.sync_proc_status_periodic_task()
- @mock.patch("apps.node_man.periodic_tasks.sync_proc_status_task.GseApi", GseApiMockClient())
+ @mock.patch(
+ "apps.node_man.periodic_tasks.sync_proc_status_task.GseApiHelper",
+ gse.get_gse_api_helper(settings.GSE_VERSION)(settings.GSE_VERSION, GseApiMockClient()),
+ )
def test_update_or_create_proc_status(self, *args, **kwargs):
host = Host.objects.create(**HOST_MODEL_DATA)
# 测试新建proc_status
@@ -62,7 +66,10 @@ def test_update_or_create_proc_status(self, *args, **kwargs):
[constants.ProcStateType.MANUAL_STOP, host.bk_host_id, GSE_PROCESS_NAME],
)
- @mock.patch("apps.node_man.periodic_tasks.sync_proc_status_task.GseApi", GseApiMockClient())
+ @mock.patch(
+ "apps.node_man.periodic_tasks.sync_proc_status_task.GseApiHelper",
+ gse.get_gse_api_helper(settings.GSE_VERSION)(settings.GSE_VERSION, GseApiMockClient()),
+ )
def test_update_or_create_proc_status_with_agent_id(self, *args, **kwargs):
host = Host.objects.create(**HOST_MODEL_DATA_WITH_AGENT_ID)
sync_proc_status_task.update_or_create_proc_status(None, [host], [GSE_PROCESS_NAME], 0)
diff --git a/common/api/modules/gse.py b/common/api/modules/gse.py
index 918eba169..ae819e82a 100644
--- a/common/api/modules/gse.py
+++ b/common/api/modules/gse.py
@@ -67,15 +67,33 @@ def __init__(self):
module=self.MODULE,
description="获取Agent状态",
)
- self.get_agent_info_list = DataAPI(
+ self.v2_cluster_list_agent_state = DataAPI(
method="POST",
- url=GSE_APIGATEWAY_ROOT_V2 + "get_agent_info_list/",
+ url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/cluster/list_agent_state/",
module=self.MODULE,
- description="获取Agent版本信息",
+ description="GSE2.0-获取Agent状态",
+ )
+ self.v2_cluster_list_agent_info = DataAPI(
+ method="POST",
+ url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/cluster/list_agent_info/",
+ module=self.MODULE,
+ description="GSE2.0-查询Agent详情列表信息",
+ )
+ self.v2_proc_get_proc_status_v2 = DataAPI(
+ method="POST",
+ url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/proc/get_proc_status_v2/",
+ module=self.MODULE,
+ description="GSE2.0-查询进程状态信息",
+ )
+ self.v2_proc_operate_proc_multi = DataAPI(
+ method="POST",
+ url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/proc/operate_proc_multi/",
+ module=self.MODULE,
+ description="GSE2.0-批量进程操作",
)
- self.get_agent_state_list = DataAPI(
+ self.v2_proc_get_proc_operate_result_v2 = DataAPI(
method="POST",
- url=GSE_APIGATEWAY_ROOT_V2 + "get_agent_state_list/",
+ url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/proc/get_proc_operate_result_v2/",
module=self.MODULE,
- description="GSE2.0 获取Agent状态",
+ description="GSE2.0-查询进程操作结果",
)
diff --git a/config/default.py b/config/default.py
index fce67bc43..6b6266c3b 100644
--- a/config/default.py
+++ b/config/default.py
@@ -136,8 +136,6 @@ def redirect_func(request):
BLUEKING_BIZ_ID = 9991001
# 作业平台版本
JOB_VERSION = os.getenv("JOB_VERSION") or "V3"
-# 管控平台平台版本
-GSE_VERSION = os.getenv("GSE_VERSION") or "V1"
# 资源池业务ID
BK_CMDB_RESOURCE_POOL_BIZ_ID = int(os.getenv("BK_CMDB_RESOURCE_POOL_BIZ_ID", 1)) or 1
BK_CMDB_RESOURCE_POOL_BIZ_NAME = "资源池"
@@ -653,6 +651,9 @@ def get_standard_redis_mode(cls, config_redis_mode: str, default: Optional[str]
BKAPP_EE_SOPS_TEMPLATE_ID = os.getenv("BKAPP_EE_SOPS_TEMPLATE_ID")
BKAPP_REQUEST_EE_SOPS_BK_BIZ_ID = os.getenv("BKAPP_REQUEST_EE_SOPS_BK_BIZ_ID")
+
+# 管控平台平台版本
+GSE_VERSION = env.GSE_VERSION
# agent 安装路径配置
GSE_AGENT_HOME = os.getenv("BKAPP_GSE_AGENT_HOME") or "/usr/local/gse"
GSE_AGENT_LOG_DIR = os.getenv("BKAPP_GSE_AGENT_LOG_DIR") or "/var/log/gse"
diff --git a/env/__init__.py b/env/__init__.py
index 6ed594962..bd1cb0b43 100644
--- a/env/__init__.py
+++ b/env/__init__.py
@@ -21,6 +21,7 @@
"LOG_TYPE",
"LOG_LEVEL",
"BK_LOG_DIR",
+ "GSE_VERSION",
"ENVIRONMENT",
# esb 访问地址
"BK_COMPONENT_API_URL",
@@ -53,3 +54,10 @@
LOG_LEVEL = get_type_env(key="LOG_LEVEL", default="INFO", _type=str)
# 日志所在目录
BK_LOG_DIR = get_type_env(key="BK_LOG_DIR", default="./../bk_nodeman/logs", _type=str)
+
+
+# ===============================================================================
+# 蓝鲸管控平台
+# ===============================================================================
+# 平台版本
+GSE_VERSION = get_type_env(key="GSE_VERSION", default=constants.GseVersion.V1.value)
diff --git a/env/constants.py b/env/constants.py
index 4d9d39a81..fb7d6fd21 100644
--- a/env/constants.py
+++ b/env/constants.py
@@ -32,3 +32,12 @@ class LogType(EnhanceEnum):
@classmethod
def _get_member__alias_map(cls) -> Dict[Enum, str]:
return {cls.DEFAULT: "与开发框架保持一致", cls.STDOUT: "标准输出"}
+
+
+class GseVersion(EnhanceEnum):
+ V1 = "V1"
+ V2 = "V2"
+
+ @classmethod
+ def _get_member__alias_map(cls) -> Dict[Enum, str]:
+ return {cls.V1: "V1", cls.V2: "V2"}
diff --git a/support-files/kubernetes/helm/bk-nodeman/README.md b/support-files/kubernetes/helm/bk-nodeman/README.md
index 7243af565..2a99d996a 100644
--- a/support-files/kubernetes/helm/bk-nodeman/README.md
+++ b/support-files/kubernetes/helm/bk-nodeman/README.md
@@ -290,9 +290,11 @@ externalRabbitMQ:
| `config.bkIamV3AppCode` | 蓝鲸权限中心相关配置,权限中心 AppCode | `"bk_iam"` |
| `config.bkAppIamResourceApiHost` | 蓝鲸权限中心相关配置,权限中心拉取权限相关资源的访问地址,默认取 `{{ .Values.bkNodemanUrl }}` | `""` |
| `config.bkAppBkNodeApiGateway` | 组件 API 接入地址,节点管理网关地址,用于覆盖 `bkComponentApiUrl` 访问节点管理
⚠️ 配置为 `{{ .Values.bkNodemanApiUrl }`} 由于 JWT 校验问题,会导致 Agent 安装步骤中「安装预制插件」失败 | `""` |
+| `config.bkAppBkGseApiGateway` | 管控平台 API 访问地址,用于覆盖 `bkComponentApiUrl` 访问管控平台 API | `""` |
| `config.bkAppBackendHost` | 节点管理自身模块依赖,后台访问地址,渲染时为空默认取 `{{ .Values.bkNodemanApiUrl }}` | `""` |
| `config.bkAppNodemanCallbackUrl` | 节点管理自身模块依赖,后台内网回调地址,渲染时为空取 `{{ .Values.bkNodemanUrl }}/backend` | `""` |
| `config.bkAppNodemanOuterCallbackUrl` | 节点管理自身模块依赖,后台外网回调地址,渲染时为空取 `{{ .Values.bkNodemanUrl }}/backend` | `""` |
+| `config.gseVersion` | 蓝鲸管控平台版本,默认为 `V1`,可选:`V1` `V2` | `V1` |
| `config.gseEnableSvrDisCovery` | 蓝鲸管控平台 Agent,AgentXXDir 仅在初次部署有效,后续可以在页面「全局配置」维护。是否启用 GSE 服务探测,默认为 `true` | `true` |
| `config.bkAppGseZkHost` | 蓝鲸管控平台 Agent,zk hosts 信息,host:port,多个 hosts 以 `,` 分隔
⚠️ ZK hosts 将作为 Agent 配置,需要保证 Agent 可访问,所以不能使用 k8s service 信息 进行配置
如果 zk 通过 k8s 部署,建议通过 NodePort 等方式暴露服务,使用 NodeIP:NodePort 进行配置 | `127.0.0.1:2181` |
| `config.bkAppGseZkAuth` | 蓝鲸管控平台 Agent,ZK 认证信息,用户名:密码 | `bkzk:zkpass` |
diff --git a/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml b/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml
index 6ab1b2faa..61a365304 100644
--- a/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml
+++ b/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml
@@ -48,12 +48,16 @@ data:
{{- if .Values.config.bkAppBkNodeApiGateway }}
BKAPP_BK_NODE_APIGATEWAY: "{{ .Values.config.bkAppBkNodeApiGateway }}/"
{{- end }}
+ {{- if .Values.config.bkAppBkGseApiGateway }}
+ BKAPP_BK_GSE_APIGATEWAY: "{{ .Values.config.bkAppBkGseApiGateway }}/"
+ {{- end }}
BK_NODEMAN_URL: "{{ .Values.bkNodemanUrl }}"
BKAPP_BACKEND_HOST: "{{ .Values.config.bkAppBackendHost | default .Values.bkNodemanApiUrl }}"
BKAPP_NODEMAN_CALLBACK_URL: "{{ .Values.config.bkAppNodemanCallbackUrl | default ( printf "%s/%s" .Values.bkNodemanUrl "backend" ) }}"
BKAPP_NODEMAN_OUTER_CALLBACK_URL: "{{ .Values.config.bkAppNodemanOuterCallbackUrl | default ( printf "%s/%s" .Values.bkNodemanUrl "backend" ) }}"
+ GSE_VERSION: "{{ .Values.config.gseVersion }}"
GSE_ENABLE_SVR_DISCOVERY: "{{ .Values.config.gseEnableSvrDisCovery }}"
BKAPP_GSE_ZK_HOST: "{{ .Values.config.bkAppGseZkHost }}"
BKAPP_GSE_ZK_AUTH: "{{ .Values.config.bkAppGseZkAuth }}"
diff --git a/support-files/kubernetes/helm/bk-nodeman/values.yaml b/support-files/kubernetes/helm/bk-nodeman/values.yaml
index 6ae67dfb3..53ab2e299 100644
--- a/support-files/kubernetes/helm/bk-nodeman/values.yaml
+++ b/support-files/kubernetes/helm/bk-nodeman/values.yaml
@@ -308,6 +308,8 @@ config:
##
## 节点管理 API 访问地址,用于覆盖 bkComponentApiUrl 访问节点管理
bkAppBkNodeApiGateway: ""
+ ## 管控平台 API 访问地址,用于覆盖 bkComponentApiUrl 访问管控平台 API
+ bkAppBkGseApiGateway: ""
## 节点管理自身模块依赖
##
@@ -320,6 +322,8 @@ config:
## 蓝鲸管控平台 Agent,AgentXXDir 仅在初次部署有效,后续可以在页面「全局配置」维护
##
+ ## 平台版本,默认为 `V1`,可选:`V1` `V2`
+ gseVersion: "V1"
## 是否启用 GSE 服务探测,为 `true` 将定期更新默认接入点的 gse svr 信息
gseEnableSvrDisCovery: true
## ZK hosts 信息,host:port,多个 hosts 以 `,` 分隔