Skip to content

Commit

Permalink
feature: GSE 多版本 API 适配 (closed #780)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Jun 22, 2022
1 parent 2760035 commit 404e5d9
Show file tree
Hide file tree
Showing 35 changed files with 704 additions and 945 deletions.
10 changes: 10 additions & 0 deletions apps/adapters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
10 changes: 10 additions & 0 deletions apps/adapters/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
42 changes: 42 additions & 0 deletions apps/adapters/api/gse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import typing

from django.conf import settings

from common.api import GseApi
from common.log import logger
from env import constants

from .base import GseApiBaseHelper
from .v1 import GseV1ApiHelper
from .v2 import GseV2ApiHelper

GSE_HELPERS: typing.Dict[str, typing.Type[GseApiBaseHelper]] = {
constants.GseVersion.V1.value: GseV1ApiHelper,
constants.GseVersion.V2.value: GseV2ApiHelper,
}


def get_gse_api_helper(gse_version: str) -> typing.Type[GseApiBaseHelper]:
if gse_version not in GSE_HELPERS:
logger.warning(
f"Get GseApiHelper failed: "
f"unsupported gse_version -> {gse_version}, options -> {GSE_HELPERS.values()}; "
f"use default -> {constants.GseVersion.V1.value}"
)
gse_version = constants.GseVersion.V1.value
return GSE_HELPERS[gse_version]


GseApiHelper: GseApiBaseHelper = get_gse_api_helper(settings.GSE_VERSION)(
version=settings.GSE_VERSION, gse_api_obj=GseApi
)
200 changes: 200 additions & 0 deletions apps/adapters/api/gse/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import abc
import typing
from collections import ChainMap

from apps.core.concurrent import controller
from apps.node_man import constants, models
from apps.utils import concurrent
from common.api import GseApi

InfoDict = typing.Dict[str, typing.Any]
InfoDictList = typing.List[InfoDict]
AgentIdInfoMap = typing.Dict[str, InfoDict]


class GseApiBaseHelper(abc.ABC):

version: str = None
gse_api_obj = None

def __init__(self, version: str, gse_api_obj=GseApi):
self.version = version
self.gse_api_obj = gse_api_obj

@abc.abstractmethod
def get_agent_id(self, mixed_types_of_host_info: typing.Union[InfoDict, models.Host]) -> str:
"""
获取 Agent 唯一标识
:param mixed_types_of_host_info: 携带主机信息的混合类型对象
:return:
"""
raise NotImplementedError

@abc.abstractmethod
def _list_proc_state(
self,
namespace: str,
proc_name: str,
labels: InfoDict,
host_info_list: InfoDictList,
extra_meta_data: InfoDict,
**options,
) -> AgentIdInfoMap:
"""
获取进程状态信息
:param namespace: 命名空间
:param proc_name: 进程名称
:param labels: 标签
:param host_info_list: 主机信息列表
:param extra_meta_data: 额外的元数据
:param options: 其他可能需要的参数
:return:
"""
raise NotImplementedError

@abc.abstractmethod
def _list_agent_state(self, host_info_list: InfoDictList) -> AgentIdInfoMap:
"""
获取 Agent 状态信息
:param host_info_list: AgentId - Agent 状态信息映射关系
:return:
"""
raise NotImplementedError

@abc.abstractmethod
def preprocessing_proc_operate_info(self, host_info_list: InfoDictList, proc_operate_info: InfoDict) -> InfoDict:
"""
进程操作信息预处理
:param host_info_list: 主机信息列表
:param proc_operate_info: 进程操作信息
:return:
"""
raise NotImplementedError

@abc.abstractmethod
def _operate_proc_multi(self, proc_operate_req: InfoDictList, **options) -> str:
"""
批量进程操作
:param proc_operate_req: 进程操作信息列表
:param options: 其他可能需要的参数
:return:
"""
raise NotImplementedError

@abc.abstractmethod
def get_proc_operate_result(self, task_id: str) -> InfoDict:
"""
获取进程操作结果
:param task_id: GSE 任务 ID
:return:
"""
raise NotImplementedError

@staticmethod
def get_version(version_str: typing.Optional[str]) -> str:
version_match: typing.Optional[typing.Match] = constants.VERSION_PATTERN.search(version_str or "")
return version_match.group() if version_match else ""

def get_gse_proc_key(
self, mixed_types_of_host_info: typing.Union[InfoDict, models.Host], namespace: str, proc_name: str, **options
) -> str:
"""
获取进程唯一标识
:param mixed_types_of_host_info: 携带主机信息的混合类型对象
:param namespace: 命名空间
:param proc_name: 进程名称
:param options: 其他可能需要的关键字参数
:return:
"""
return f"{self.get_agent_id(mixed_types_of_host_info)}:{namespace}:{proc_name}"

def list_proc_state(
self,
namespace: str,
proc_name: str,
labels: InfoDict,
host_info_list: InfoDictList,
extra_meta_data: InfoDict,
**options,
) -> AgentIdInfoMap:
"""
获取进程状态信息
:param namespace: 命名空间
:param proc_name: 进程名称
:param labels: 标签
:param host_info_list: 主机信息列表
:param extra_meta_data: 额外的元数据
:param options: 其他可能需要的参数
:return:
"""

@controller.ConcurrentController(
data_list_name="_host_info_list",
batch_call_func=concurrent.batch_call,
extend_result=False,
get_config_dict_func=lambda: {"limit": constants.QUERY_PROC_STATUS_HOST_LENS},
)
def get_proc_status_inner(
_namespace: str,
_proc_name: str,
_labels: InfoDict,
_host_info_list: InfoDictList,
_extra_meta_data: InfoDict,
**_options,
) -> typing.List[AgentIdInfoMap]:
return self._list_proc_state(_namespace, _proc_name, _labels, _host_info_list, _extra_meta_data, **_options)

agent_id__proc_status_info_map_list: typing.List[AgentIdInfoMap] = get_proc_status_inner(
_namespace=namespace,
_proc_name=proc_name,
_labels=labels,
_host_info_list=host_info_list,
_extra_meta_data=extra_meta_data,
**options,
)
return dict(ChainMap(*agent_id__proc_status_info_map_list))

def list_agent_state(self, host_info_list: InfoDictList) -> AgentIdInfoMap:
"""
获取 Agent 状态信息
版本 / 状态
:param host_info_list: AgentId - Agent 状态信息映射关系
:return:
"""

@controller.ConcurrentController(
data_list_name="_host_info_list",
batch_call_func=concurrent.batch_call,
extend_result=False,
get_config_dict_func=lambda: {"limit": constants.QUERY_AGENT_STATUS_HOST_LENS},
)
def list_agent_state_inner(_host_info_list: InfoDictList) -> typing.List[AgentIdInfoMap]:
return self._list_agent_state(_host_info_list)

agent_id__agent_state_info_map_list: typing.List[AgentIdInfoMap] = list_agent_state_inner(
_host_info_list=host_info_list
)
return dict(ChainMap(*agent_id__agent_state_info_map_list))

def operate_proc_multi(self, proc_operate_req: InfoDictList, **options) -> str:
"""
批量进程操作
:param proc_operate_req: 进程操作信息列表
:param options: 其他可能需要的参数
:return:
"""
preprocessed_proc_operate_req: InfoDictList = []
for proc_operate_info in proc_operate_req:
hosts = proc_operate_info.pop("hosts")
preprocessed_proc_operate_req.append(self.preprocessing_proc_operate_info(hosts, proc_operate_info))
return self._operate_proc_multi(preprocessed_proc_operate_req, **options)
103 changes: 103 additions & 0 deletions apps/adapters/api/gse/v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import typing
from collections import Mapping

from apps.node_man import constants, models

from . import base


class GseV1ApiHelper(base.GseApiBaseHelper):
def get_agent_id(self, mixed_types_of_host_info: typing.Union[base.InfoDict, models.Host]) -> str:
if isinstance(mixed_types_of_host_info, Mapping):
if "host" in mixed_types_of_host_info:
return self.get_agent_id(mixed_types_of_host_info["host"])
bk_cloud_id: int = mixed_types_of_host_info["bk_cloud_id"]
for attr_name in ["ip", "inner_ip", "bk_host_innerip"]:
ip = mixed_types_of_host_info.get(attr_name, "")
if ip:
break
if not ip:
raise ValueError("can not get ip in dict type host info")
else:
bk_cloud_id: typing.Optional[int] = getattr(mixed_types_of_host_info, "bk_cloud_id")
if bk_cloud_id is None:
raise ValueError("can not get bk_cloud_id in obj type host info")
for attr_name in ["inner_ip"]:
ip = getattr(mixed_types_of_host_info, attr_name, "")
if ip:
break
if not ip:
raise ValueError("can not get ip in obj type host info")

return f"{bk_cloud_id}:{ip}"

def _list_proc_state(
self,
namespace: str,
proc_name: str,
labels: base.InfoDict,
host_info_list: base.InfoDictList,
extra_meta_data: base.InfoDict,
**options,
) -> base.AgentIdInfoMap:
hosts: base.InfoDictList = [
{"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} for host_info in host_info_list
]
query_params: base.InfoDict = {
"meta": {"namespace": constants.GSE_NAMESPACE, "name": proc_name, "labels": {"proc_name": proc_name}},
"hosts": hosts,
}
# 接口测试
# 200: 0.3s-0.4s (0.41s 0.29s 0.33s)
# 500: 0.35s-0.5s (0.34s 0.42s 0.51s)
# 1000: 0.5s-0.6s (0.53s 0.56s 0.61s)
# 2000: 0.9s (0.91s, 0.93s)
# 5000: 2s-4s (2.3s 2.2s 4.1s 4.3s)
proc_infos: base.InfoDictList = self.gse_api_obj.get_proc_status(query_params).get("proc_infos") or []
agent_id__proc_info_map: base.AgentIdInfoMap = {}
for proc_info in proc_infos:
agent_id__proc_info_map[self.get_agent_id(proc_info)] = {
"version": self.get_version(proc_info.get("version")),
"status": constants.PLUGIN_STATUS_DICT[proc_info["status"]],
"is_auto": constants.AutoStateType.AUTO if proc_info["isauto"] else constants.AutoStateType.UNAUTO,
"name": proc_info["meta"]["name"],
}
return agent_id__proc_info_map

def _list_agent_state(self, host_info_list: base.InfoDictList) -> base.AgentIdInfoMap:
hosts: base.InfoDictList = [
{"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} for host_info in host_info_list
]
agent_id__info_map: base.AgentIdInfoMap = self.gse_api_obj.get_agent_info({"hosts": hosts})
agent_id__status_map: base.AgentIdInfoMap = self.gse_api_obj.get_agent_status({"hosts": hosts})

for agent_id, agent_info in agent_id__info_map.items():
agent_info.update(agent_id__status_map.get(agent_id) or {})
agent_info["version"] = self.get_version(agent_info.get("version"))
agent_info["bk_agent_alive"] = agent_info.get("bk_agent_alive") or constants.BkAgentStatus.NOT_ALIVE.value

return agent_id__info_map

def preprocessing_proc_operate_info(
self, host_info_list: base.InfoDictList, proc_operate_info: base.InfoDict
) -> base.InfoDict:
proc_operate_info["hosts"] = [
{"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]} for host_info in host_info_list
]
return proc_operate_info

def _operate_proc_multi(self, proc_operate_req: base.InfoDictList, **options) -> str:
return self.gse_api_obj.operate_proc_multi({"proc_operate_req": proc_operate_req})["task_id"]

def get_proc_operate_result(self, task_id: str) -> base.InfoDict:
return self.gse_api_obj.get_proc_operate_result({"task_id": task_id}, raw=True)
Loading

0 comments on commit 404e5d9

Please sign in to comment.